service

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: MIT Imports: 29 Imported by: 10

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidSignature = errors.New("invalid signature")
	ErrInvalidIdentity  = errors.New("invalid identity")
	ErrUnknownIdentity  = errors.New("unknown identity")
	ErrPubConnection    = errors.New("publishing NATS connection is nil")
	ErrSubConnection    = errors.New("subscribing NATS connection is nil")
	ErrReqConnection    = errors.New("request NATS connection is nil")
)
View Source
var ErrNotAvailable = fmt.Errorf("not available")

Functions

func WithCodec

func WithCodec(c options.Codec) options.Option

WithCodec will configure the codec.

func WithContext

func WithContext(ctx context.Context) options.Option

WithContext sets the context for the publisher.

func WithKnownIdentities

func WithKnownIdentities(ids ...string) options.Option

WithKnownIdentities will add known identities as base58 encoded ed25519 public keys. Only Messages from publishers identified by these keys will be accepted.

func WithKnownPublicKey

func WithKnownPublicKey(keys ...ed25519.PublicKey) options.Option

WithKnownPublicKey will add known public keys. Only Messages from publishers identified by these keys will be accepted.

func WithLogger

func WithLogger(logger *slog.Logger) options.Option

WithLogger sets the logger for the publisher.

func WithNKeySeed

func WithNKeySeed(contents string) options.Option

WithNKeySeed will decode decorated NATS NKey and use it for identity.

func WithName

func WithName(name string) options.Option

WithName sets name of the publisher. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.

func WithNats

func WithNats(nc options.NatsConn) options.Option

WithNats sets up preconfigured NATS connector for publishing, subscribing, and request/reply.

func WithParam

func WithParam(key string, val any) options.Option

WithParam adds a key-value pair to the configuration of the publisher.

func WithPemPrivateKey

func WithPemPrivateKey(keyFile string) options.Option

WithPemPrivateKey will load ED25519 private key from a PEM file and use it for identity.

func WithPrefix

func WithPrefix(prefix string) options.Option

WithPrefix sets the prefix for the subject. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.

func WithPrivateKey

func WithPrivateKey(pkey crypto.PrivateKey) options.Option

WithPrivateKey will configure identity using ED25519 crypto.PrivateKey.

func WithPubNats

func WithPubNats(nc options.NatsConn) options.Option

WithPubNats sets up preconfigured NATS connector specifically for publishing.

func WithPublishQueueSize

func WithPublishQueueSize(n int) options.Option

WithPublishQueueSize will configure the size of the publish queue.

func WithReqNats

func WithReqNats(nc options.NatsConn) options.Option

WithReqNats sets up preconfigured NATS connector specifically for request/reply.

func WithStreamName

func WithStreamName(name string) options.Option

WithStreamName sets name of the JetStream stream. If an empty string is used, then stream name will be "{prefix}-{name}".

func WithSubNats

func WithSubNats(nc options.NatsConn) options.Option

WithSubNats sets up preconfigured NATS connector specifically for subscribing.

NOTE: This will also set ReqNats for compatibility.

func WithTelemetryPeriod

func WithTelemetryPeriod(p time.Duration) options.Option

WithTelemetryPeriod will configure verbosity level.

func WithUserCreds

func WithUserCreds(path string) options.Option

WithUserCreds will load NKey from NATS User Credentials file and use it for identity.

func WithVerbose

func WithVerbose(v bool) options.Option

WithVerbose will configure verbosity level.

Types

type JetStreamer

type JetStreamer interface {
	JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
}

type Message

type Message interface {
	Ack(opts ...nats.AckOpt) error
	AckSync(opts ...nats.AckOpt) error
	Equal(msg Message) bool
	InProgress(opts ...nats.AckOpt) error
	Metadata() (*nats.MsgMetadata, error)
	Nak(opts ...nats.AckOpt) error
	NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error
	Respond(proto.Message) error
	Term(opts ...nats.AckOpt) error

	Message() *nats.Msg
	Subject() string
	Reply() string
	Data() []byte
	Header() nats.Header
	QueueName() string
}

type MessageHandler

type MessageHandler func(msg Message)

type MockJetStreamer added in v0.4.3

type MockJetStreamer struct {
	mock.Mock
}

MockJetStreamer is an autogenerated mock type for the JetStreamer type

func NewMockJetStreamer added in v0.4.3

func NewMockJetStreamer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockJetStreamer

NewMockJetStreamer creates a new instance of MockJetStreamer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockJetStreamer) EXPECT added in v0.4.3

func (*MockJetStreamer) JetStream added in v0.4.3

func (_m *MockJetStreamer) JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)

JetStream provides a mock function with given fields: opts

type MockJetStreamer_Expecter added in v0.4.3

type MockJetStreamer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockJetStreamer_Expecter) JetStream added in v0.4.3

func (_e *MockJetStreamer_Expecter) JetStream(opts ...interface{}) *MockJetStreamer_JetStream_Call

JetStream is a helper method to define mock.On call

  • opts ...nats.JSOpt

type MockJetStreamer_JetStream_Call added in v0.4.3

type MockJetStreamer_JetStream_Call struct {
	*mock.Call
}

MockJetStreamer_JetStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JetStream'

func (*MockJetStreamer_JetStream_Call) Return added in v0.4.3

func (*MockJetStreamer_JetStream_Call) Run added in v0.4.3

func (*MockJetStreamer_JetStream_Call) RunAndReturn added in v0.4.3

type MockMessage added in v0.4.3

type MockMessage struct {
	mock.Mock
}

MockMessage is an autogenerated mock type for the Message type

func NewMockMessage added in v0.4.3

func NewMockMessage(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockMessage

NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockMessage) Ack added in v0.4.3

func (_m *MockMessage) Ack(opts ...nats.AckOpt) error

Ack provides a mock function with given fields: opts

func (*MockMessage) AckSync added in v0.4.3

func (_m *MockMessage) AckSync(opts ...nats.AckOpt) error

AckSync provides a mock function with given fields: opts

func (*MockMessage) Data added in v0.4.3

func (_m *MockMessage) Data() []byte

Data provides a mock function with given fields:

func (*MockMessage) EXPECT added in v0.4.3

func (_m *MockMessage) EXPECT() *MockMessage_Expecter

func (*MockMessage) Equal added in v0.4.3

func (_m *MockMessage) Equal(msg Message) bool

Equal provides a mock function with given fields: msg

func (*MockMessage) Header added in v0.4.3

func (_m *MockMessage) Header() nats.Header

Header provides a mock function with given fields:

func (*MockMessage) InProgress added in v0.4.3

func (_m *MockMessage) InProgress(opts ...nats.AckOpt) error

InProgress provides a mock function with given fields: opts

func (*MockMessage) Message added in v0.4.3

func (_m *MockMessage) Message() *nats.Msg

Message provides a mock function with given fields:

func (*MockMessage) Metadata added in v0.4.3

func (_m *MockMessage) Metadata() (*nats.MsgMetadata, error)

Metadata provides a mock function with given fields:

func (*MockMessage) Nak added in v0.4.3

func (_m *MockMessage) Nak(opts ...nats.AckOpt) error

Nak provides a mock function with given fields: opts

func (*MockMessage) NakWithDelay added in v0.4.3

func (_m *MockMessage) NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error

NakWithDelay provides a mock function with given fields: delay, opts

func (*MockMessage) QueueName added in v0.4.3

func (_m *MockMessage) QueueName() string

QueueName provides a mock function with given fields:

func (*MockMessage) Reply added in v0.4.3

func (_m *MockMessage) Reply() string

Reply provides a mock function with given fields:

func (*MockMessage) Respond added in v0.4.3

func (_m *MockMessage) Respond(_a0 protoreflect.ProtoMessage) error

Respond provides a mock function with given fields: _a0

func (*MockMessage) Subject added in v0.4.3

func (_m *MockMessage) Subject() string

Subject provides a mock function with given fields:

func (*MockMessage) Term added in v0.4.3

func (_m *MockMessage) Term(opts ...nats.AckOpt) error

Term provides a mock function with given fields: opts

type MockMessage_AckSync_Call added in v0.4.3

type MockMessage_AckSync_Call struct {
	*mock.Call
}

MockMessage_AckSync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AckSync'

func (*MockMessage_AckSync_Call) Return added in v0.4.3

func (*MockMessage_AckSync_Call) Run added in v0.4.3

func (_c *MockMessage_AckSync_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_AckSync_Call

func (*MockMessage_AckSync_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_AckSync_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_AckSync_Call

type MockMessage_Ack_Call added in v0.4.3

type MockMessage_Ack_Call struct {
	*mock.Call
}

MockMessage_Ack_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ack'

func (*MockMessage_Ack_Call) Return added in v0.4.3

func (*MockMessage_Ack_Call) Run added in v0.4.3

func (_c *MockMessage_Ack_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Ack_Call

func (*MockMessage_Ack_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Ack_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Ack_Call

type MockMessage_Data_Call added in v0.4.3

type MockMessage_Data_Call struct {
	*mock.Call
}

MockMessage_Data_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Data'

func (*MockMessage_Data_Call) Return added in v0.4.3

func (*MockMessage_Data_Call) Run added in v0.4.3

func (_c *MockMessage_Data_Call) Run(run func()) *MockMessage_Data_Call

func (*MockMessage_Data_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Data_Call) RunAndReturn(run func() []byte) *MockMessage_Data_Call

type MockMessage_Equal_Call added in v0.4.3

type MockMessage_Equal_Call struct {
	*mock.Call
}

MockMessage_Equal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Equal'

func (*MockMessage_Equal_Call) Return added in v0.4.3

func (*MockMessage_Equal_Call) Run added in v0.4.3

func (_c *MockMessage_Equal_Call) Run(run func(msg Message)) *MockMessage_Equal_Call

func (*MockMessage_Equal_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Equal_Call) RunAndReturn(run func(Message) bool) *MockMessage_Equal_Call

type MockMessage_Expecter added in v0.4.3

type MockMessage_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockMessage_Expecter) Ack added in v0.4.3

func (_e *MockMessage_Expecter) Ack(opts ...interface{}) *MockMessage_Ack_Call

Ack is a helper method to define mock.On call

  • opts ...nats.AckOpt

func (*MockMessage_Expecter) AckSync added in v0.4.3

func (_e *MockMessage_Expecter) AckSync(opts ...interface{}) *MockMessage_AckSync_Call

AckSync is a helper method to define mock.On call

  • opts ...nats.AckOpt

func (*MockMessage_Expecter) Data added in v0.4.3

Data is a helper method to define mock.On call

func (*MockMessage_Expecter) Equal added in v0.4.3

func (_e *MockMessage_Expecter) Equal(msg interface{}) *MockMessage_Equal_Call

Equal is a helper method to define mock.On call

  • msg Message

func (*MockMessage_Expecter) Header added in v0.4.3

Header is a helper method to define mock.On call

func (*MockMessage_Expecter) InProgress added in v0.4.3

func (_e *MockMessage_Expecter) InProgress(opts ...interface{}) *MockMessage_InProgress_Call

InProgress is a helper method to define mock.On call

  • opts ...nats.AckOpt

func (*MockMessage_Expecter) Message added in v0.4.3

Message is a helper method to define mock.On call

func (*MockMessage_Expecter) Metadata added in v0.4.3

Metadata is a helper method to define mock.On call

func (*MockMessage_Expecter) Nak added in v0.4.3

func (_e *MockMessage_Expecter) Nak(opts ...interface{}) *MockMessage_Nak_Call

Nak is a helper method to define mock.On call

  • opts ...nats.AckOpt

func (*MockMessage_Expecter) NakWithDelay added in v0.4.3

func (_e *MockMessage_Expecter) NakWithDelay(delay interface{}, opts ...interface{}) *MockMessage_NakWithDelay_Call

NakWithDelay is a helper method to define mock.On call

  • delay time.Duration
  • opts ...nats.AckOpt

func (*MockMessage_Expecter) QueueName added in v0.4.3

QueueName is a helper method to define mock.On call

func (*MockMessage_Expecter) Reply added in v0.4.3

Reply is a helper method to define mock.On call

func (*MockMessage_Expecter) Respond added in v0.4.3

func (_e *MockMessage_Expecter) Respond(_a0 interface{}) *MockMessage_Respond_Call

Respond is a helper method to define mock.On call

  • _a0 protoreflect.ProtoMessage

func (*MockMessage_Expecter) Subject added in v0.4.3

Subject is a helper method to define mock.On call

func (*MockMessage_Expecter) Term added in v0.4.3

func (_e *MockMessage_Expecter) Term(opts ...interface{}) *MockMessage_Term_Call

Term is a helper method to define mock.On call

  • opts ...nats.AckOpt

type MockMessage_Header_Call added in v0.4.3

type MockMessage_Header_Call struct {
	*mock.Call
}

MockMessage_Header_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Header'

func (*MockMessage_Header_Call) Return added in v0.4.3

func (*MockMessage_Header_Call) Run added in v0.4.3

func (_c *MockMessage_Header_Call) Run(run func()) *MockMessage_Header_Call

func (*MockMessage_Header_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Header_Call) RunAndReturn(run func() nats.Header) *MockMessage_Header_Call

type MockMessage_InProgress_Call added in v0.4.3

type MockMessage_InProgress_Call struct {
	*mock.Call
}

MockMessage_InProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InProgress'

func (*MockMessage_InProgress_Call) Return added in v0.4.3

func (*MockMessage_InProgress_Call) Run added in v0.4.3

func (*MockMessage_InProgress_Call) RunAndReturn added in v0.4.3

type MockMessage_Message_Call added in v0.4.3

type MockMessage_Message_Call struct {
	*mock.Call
}

MockMessage_Message_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Message'

func (*MockMessage_Message_Call) Return added in v0.4.3

func (*MockMessage_Message_Call) Run added in v0.4.3

func (*MockMessage_Message_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Message_Call) RunAndReturn(run func() *nats.Msg) *MockMessage_Message_Call

type MockMessage_Metadata_Call added in v0.4.3

type MockMessage_Metadata_Call struct {
	*mock.Call
}

MockMessage_Metadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Metadata'

func (*MockMessage_Metadata_Call) Return added in v0.4.3

func (*MockMessage_Metadata_Call) Run added in v0.4.3

func (*MockMessage_Metadata_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Metadata_Call) RunAndReturn(run func() (*nats.MsgMetadata, error)) *MockMessage_Metadata_Call

type MockMessage_NakWithDelay_Call added in v0.4.3

type MockMessage_NakWithDelay_Call struct {
	*mock.Call
}

MockMessage_NakWithDelay_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NakWithDelay'

func (*MockMessage_NakWithDelay_Call) Return added in v0.4.3

func (*MockMessage_NakWithDelay_Call) Run added in v0.4.3

func (*MockMessage_NakWithDelay_Call) RunAndReturn added in v0.4.3

type MockMessage_Nak_Call added in v0.4.3

type MockMessage_Nak_Call struct {
	*mock.Call
}

MockMessage_Nak_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Nak'

func (*MockMessage_Nak_Call) Return added in v0.4.3

func (*MockMessage_Nak_Call) Run added in v0.4.3

func (_c *MockMessage_Nak_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Nak_Call

func (*MockMessage_Nak_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Nak_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Nak_Call

type MockMessage_QueueName_Call added in v0.4.3

type MockMessage_QueueName_Call struct {
	*mock.Call
}

MockMessage_QueueName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueueName'

func (*MockMessage_QueueName_Call) Return added in v0.4.3

func (*MockMessage_QueueName_Call) Run added in v0.4.3

func (*MockMessage_QueueName_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_QueueName_Call) RunAndReturn(run func() string) *MockMessage_QueueName_Call

type MockMessage_Reply_Call added in v0.4.3

type MockMessage_Reply_Call struct {
	*mock.Call
}

MockMessage_Reply_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reply'

func (*MockMessage_Reply_Call) Return added in v0.4.3

func (*MockMessage_Reply_Call) Run added in v0.4.3

func (_c *MockMessage_Reply_Call) Run(run func()) *MockMessage_Reply_Call

func (*MockMessage_Reply_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Reply_Call) RunAndReturn(run func() string) *MockMessage_Reply_Call

type MockMessage_Respond_Call added in v0.4.3

type MockMessage_Respond_Call struct {
	*mock.Call
}

MockMessage_Respond_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Respond'

func (*MockMessage_Respond_Call) Return added in v0.4.3

func (*MockMessage_Respond_Call) Run added in v0.4.3

func (*MockMessage_Respond_Call) RunAndReturn added in v0.4.3

type MockMessage_Subject_Call added in v0.4.3

type MockMessage_Subject_Call struct {
	*mock.Call
}

MockMessage_Subject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subject'

func (*MockMessage_Subject_Call) Return added in v0.4.3

func (*MockMessage_Subject_Call) Run added in v0.4.3

func (*MockMessage_Subject_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Subject_Call) RunAndReturn(run func() string) *MockMessage_Subject_Call

type MockMessage_Term_Call added in v0.4.3

type MockMessage_Term_Call struct {
	*mock.Call
}

MockMessage_Term_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Term'

func (*MockMessage_Term_Call) Return added in v0.4.3

func (*MockMessage_Term_Call) Run added in v0.4.3

func (_c *MockMessage_Term_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Term_Call

func (*MockMessage_Term_Call) RunAndReturn added in v0.4.3

func (_c *MockMessage_Term_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Term_Call

type Service

type Service struct {
	options.Options
	// contains filtered or unexported fields
}

Service is the base publisher structure. It must be embedded in the publisher to benefit from the common implementation. Config method must be called on this embedded struct in order to properly set it up.

func (*Service) AddStatusCallback

func (b *Service) AddStatusCallback(callback StatusFunc)

AddStatusCallback will register a status callback.

func (*Service) AddStream

func (b *Service) AddStream(maxMsgs, maxBytes uint64, age time.Duration, subjects ...string) error

AddStream is an experimental feature that creates a durable stream. It is possible to subscribe to this durable stream using regular Subscribe or SubscribeTo methods given that the subject is included in the created stream.

The interface for this feature is experimental and it should be expected to change.

This method will create a stream with maxMsgs, maxBytes, and age for a list of subjects on JetStream if it does not exist. This is a temporary solution so that the stream doesn't have to be created manually. However, this will change in the near future, therefore users will have to make sure that the stream exists before calling this method(maxMsgs, maxBytes, and age parameters will be removed).

NOTE: Messages are automatically acknowledged after handler returns.

func (*Service) Close

func (b *Service) Close() error

Close should be closed to clean-up the publisher.

func (*Service) ConcatenateStatus

func (b *Service) ConcatenateStatus(path string, status, items map[string]string) map[string]string

Concatenate status copies (k,v) from items to status optionally prefixing the key with path.

func (*Service) Configure

func (b *Service) Configure(opts ...options.Option) error

Configure must be called by the publisher implementation.

func (*Service) Fail

func (b *Service) Fail(err error)

Fail is a convenience function that allows to asynchronously propagate errors.

func (*Service) Publish

func (b *Service) Publish(msg proto.Message, suffixes ...string) error

Publish will sign the message and publish it to a subject constructed from "{prefix}.{name}.{suffixes}". Publish will use PubNats connection.

func (*Service) PublishBuf

func (b *Service) PublishBuf(buf []byte, suffixes ...string) error

PublishBuf is the same as Publish, but will publish the raw bytes.

func (*Service) PublishBufTo

func (b *Service) PublishBufTo(buf []byte, tokens ...string) error

PublishBufTo is the same as PublishTo, but for raw bytes.

func (*Service) PublishBufToRpc added in v0.4.3

func (b *Service) PublishBufToRpc(buf []byte, replyTo string, tokens ...string) error

PublishBufToRpc is the same as PublishBufTo, but uses ReqNats.

func (*Service) PublishTo

func (b *Service) PublishTo(msg proto.Message, tokens ...string) error

PublishTo will sign the message and publish it to a specific subject constructed from subject tokens. PublishTo will use PubNats connection.

func (*Service) PublishToRpc added in v0.4.3

func (b *Service) PublishToRpc(msg proto.Message, replyTo string, tokens ...string) error

PublishToRpc will sign the message and publish it to a specific subject constructed from subject tokens. PublishToRpc will use ReqNats connection.

func (*Service) RemoveStatusCallback

func (b *Service) RemoveStatusCallback(callback StatusFunc)

RemoveStatusCallback will remove a status callback.

func (*Service) RemoveStream

func (b *Service) RemoveStream(subjects ...string) error

RemoveStream will attempt to remove consumers based on a list of subjects. List of subjects must be exactly the same as was used in AddStream since js Consumer names are based on the subjects.

func (*Service) RequestBufFrom

func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, tokens ...string) (Message, error)

RequestBufFrom requests a reply from a subject using ReqNats connection. This a synchronous operation that does not involve publisher queue.

func (*Service) RequestFrom

func (b *Service) RequestFrom(ctx context.Context, msg proto.Message, resp proto.Message, tokens ...string) (Message, error)

RequestFrom requests a reply from a subject using ReqNats connection. The subject will be constructed from tokens. This a synchronous operation that does not involve publisher queue.

func (*Service) Respond

func (b *Service) Respond(nmsg Message, msg proto.Message) error

Respond will respond to a message sent as a request. This is a helper function and Message.Respond should be used instead.

func (*Service) RespondBuf

func (b *Service) RespondBuf(msg Message, buf []byte) error

RespondBuf is the same as Respond, but will respond with raw bytes.

func (*Service) RpcInbox added in v0.4.3

func (b *Service) RpcInbox(suffixes ...string) string

RpcInbox returns a unique subject

func (*Service) Serve

func (b *Service) Serve(handler ServiceHandler, suffixes ...string) (*nats.Subscription, error)

Serve is a convenience method to serve a service subject. It acts the same as Subscribe, but takes `ServiceHandler` instead, and will respond either with Error type or response from the handler. Serve will use ReqNats connection.

func (*Service) Sign

func (b *Service) Sign(msg []byte) (signature []byte, publicKey []byte, err error)

Sign will sign the bytes.

func (*Service) Start

func (b *Service) Start() context.Context

Start will start the publisher base implementation that includes telemetry and other internal goroutines.

func (*Service) Subscribe

func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)

Subscribe will subscribe to a subject constructed from {prefix}.{name}.{...suffixes}, where suffixes are joined using ".". Subscribe will use SubNats connection.

func (*Service) SubscribeTo

func (b *Service) SubscribeTo(handler MessageHandler, tokens ...string) (*nats.Subscription, error)

SubscribeTo will subscribe to a subject constructed as {...tokens}, where tokens are joined using ".".

Experimental: When a stream was registered with AddStream SubscribeTo will use durable stream instead of realtime.

func (*Service) Unmarshal

func (b *Service) Unmarshal(nmsg Message, msg proto.Message) (nats.Header, error)

Unmarshal is a convenience function that first verifies any signatures in the message and unmarshals bytes into a message.

func (*Service) Verify

func (b *Service) Verify(nmsg Message) error

Verify will marshal the unmarshalled payload back to bytes and verifies the signature with that.

type ServiceHandler

type ServiceHandler func(msg Message) (proto.Message, error)

type StatusFunc

type StatusFunc func() map[string]string

StatusFunc is a type for callback func. This will be called periodically to construct telemetry status. "uptime", "goroutines", "period" keys will be overriden internally.

type Subject

type Subject string

Subject represents a NATS subject which can include wildcards.

func NewSubject added in v0.4.3

func NewSubject(prefix string, suffixes ...string) Subject

func (Subject) Match

func (s Subject) Match(subject Subject) bool

Match tries to pattern-match the subject against another subject. It considers NATS wildcard rules where '*' matches any token at a level, and '>' matches all subsequent tokens.

func (Subject) RemainingTokens

func (s Subject) RemainingTokens(pattern []string) []string

RemainingTokens matches subject tokens until the end of subject tokens and returns the remaining tokens.

func (Subject) String

func (s Subject) String() string

String converts the Subject back to its string representation.

func (Subject) SymmetricMatch

func (s Subject) SymmetricMatch(subject Subject) bool

SymmetricMatch tries to pattern-match a subject against another subject in both directions. It considers a match if either subject matches the other according to NATS wildcard rules.

func (Subject) Tokens

func (s Subject) Tokens() []string

Tokens splits the Subject into its constituent parts, divided by '.'.

func (Subject) Validate

func (s Subject) Validate() error

Validate checks if the Subject contains any characters that are not allowed.

type SubjectMap

type SubjectMap map[Subject]int

SubjectMap maps subjects (as strings) to indices to an array that holds stream and consumer configs.

func (SubjectMap) Add

func (m SubjectMap) Add(subject Subject, idx int) error

Add inserts a subject and its associated index into the map. It returns an error if the subject is invalid, but in the current implementation, it always succeeds.

func (SubjectMap) Get

func (m SubjectMap) Get(subject Subject) (int, bool)

Get returns the index stored

func (SubjectMap) Search

func (m SubjectMap) Search(subject Subject) (Subject, int, bool)

Search looks for a subject in the map that matches the given subject according to NATS pattern matching rules. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.

func (SubjectMap) SymmetricSearch

func (m SubjectMap) SymmetricSearch(subject Subject) (Subject, int, bool)

SymmetricSearch looks for a subject in the map that symmetrically matches the given subject. Symmetric matching means either the map's subject matches the given subject or vice versa. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL