Documentation
¶
Overview ¶
Package natsjs provides the nats jetstream event client for go-orb.
Index ¶
- Constants
- Variables
- func Provide(configData map[string]any, logger log.Logger, opts ...event.Option) (event.Type, error)
- func WithAllowReconnect(allowReconnect bool) event.Option
- func WithCompression(compression bool) event.Option
- func WithDisableDurableStreams(disableDurableStreams bool) event.Option
- func WithDrainTimeout(drainTimeout time.Duration) event.Option
- func WithFlusherTimeout(flusherTimeout time.Duration) event.Option
- func WithIgnoreAuthErrorAbort(ignoreAuthErrorAbort bool) event.Option
- func WithInboxPrefix(inboxPrefix string) event.Option
- func WithMaxConcurrent(n int) event.Option
- func WithMaxPingsOut(maxPingsOut int) event.Option
- func WithMaxReconnect(maxReconnect int) event.Option
- func WithNkey(nkey string) event.Option
- func WithNoCallbacksAfterClientClose(noCallbacksAfterClientClose bool) event.Option
- func WithNoRandomize(noRandomize bool) event.Option
- func WithPassword(password string) event.Option
- func WithPingInterval(pingInterval time.Duration) event.Option
- func WithProxyPath(proxyPath string) event.Option
- func WithPublishCodec(n string) event.Option
- func WithReconnectBufSize(reconnectBufSize int) event.Option
- func WithReconnectJitter(reconnectJitter time.Duration) event.Option
- func WithReconnectJitterTLS(reconnectJitterTLS time.Duration) event.Option
- func WithReconnectWait(reconnectWait time.Duration) event.Option
- func WithRequestCodec(n string) event.Option
- func WithRetryOnFailedConnect(retryOnFailedConnect bool) event.Option
- func WithServers(servers []string) event.Option
- func WithSkipHostLookup(skipHostLookup bool) event.Option
- func WithSubChanLen(subChanLen int) event.Option
- func WithSyncPublish(syncPublish bool) event.Option
- func WithTimeout(timeout time.Duration) event.Option
- func WithToken(token string) event.Option
- func WithURL(url string) event.Option
- func WithUseOldRequestStyle(useOldRequestStyle bool) event.Option
- func WithUser(user string) event.Option
- type Config
- type NatsJS
- func (n *NatsJS) Clone() event.Type
- func (n *NatsJS) Consume(topic string, opts ...event.ConsumeOption) (<-chan event.Event, error)
- func (n *NatsJS) GetPublishCodec() codecs.Marshaler
- func (n *NatsJS) HandleRequest(ctx context.Context, topic string, ...)
- func (n *NatsJS) Publish(ctx context.Context, topic string, msg any, opts ...event.PublishOption) error
- func (n *NatsJS) Request(ctx context.Context, req *event.Req[[]byte, any], opts ...event.RequestOption) ([]byte, error)
- func (n *NatsJS) Start(_ context.Context) error
- func (n *NatsJS) Stop(_ context.Context) error
- func (n *NatsJS) String() string
- func (n *NatsJS) Type() string
- type NatsOptions
Constants ¶
const Name = "natsjs"
Name provides the name of this event client.
Variables ¶
var ( DefaultMaxConcurrent = 256 // DefaultSyncPublish allows using a synchronous publishing instead of the default asynchronous. DefaultSyncPublish = false // DefaultDisableDurableStreams configures whether to disable durable streams. DefaultDisableDurableStreams = false )
Defaults.
Functions ¶
func Provide ¶
func Provide( configData map[string]any, logger log.Logger, opts ...event.Option, ) (event.Type, error)
Provide creates a new NatsJS event client.
func WithAllowReconnect ¶ added in v0.2.0
WithAllowReconnect enables reconnection logic when disconnected from the server.
func WithCompression ¶ added in v0.2.0
WithCompression enables compression for websocket connections.
func WithDisableDurableStreams ¶ added in v0.2.0
WithDisableDurableStreams configures whether to disable durable streams.
func WithDrainTimeout ¶ added in v0.2.0
WithDrainTimeout sets the timeout for a Drain Operation to complete.
func WithFlusherTimeout ¶ added in v0.2.0
WithFlusherTimeout sets the maximum time to wait for write operations to complete.
func WithIgnoreAuthErrorAbort ¶ added in v0.2.0
WithIgnoreAuthErrorAbort opts out of aborting reconnect attempts on repeated auth errors.
func WithInboxPrefix ¶ added in v0.2.0
WithInboxPrefix allows customizing the default _INBOX prefix.
func WithMaxConcurrent ¶
WithMaxConcurrent sets the number of concurrent workers.
func WithMaxPingsOut ¶ added in v0.2.0
WithMaxPingsOut sets the maximum number of pending ping commands before raising an error.
func WithMaxReconnect ¶ added in v0.2.0
WithMaxReconnect sets the number of reconnect attempts before giving up.
func WithNoCallbacksAfterClientClose ¶ added in v0.2.0
WithNoCallbacksAfterClientClose prevents callbacks after Close() is called.
func WithNoRandomize ¶ added in v0.2.0
WithNoRandomize configures whether to randomize the server pool.
func WithPassword ¶ added in v0.2.0
WithPassword sets the password for authentication.
func WithPingInterval ¶ added in v0.2.0
WithPingInterval sets the period for sending ping commands to the server.
func WithProxyPath ¶ added in v0.2.0
WithProxyPath adds a path to connections URL for websocket connections.
func WithPublishCodec ¶ added in v0.2.0
WithPublishCodec sets the internal codec.
func WithReconnectBufSize ¶ added in v0.2.0
WithReconnectBufSize sets the size of the backing bufio during reconnect.
func WithReconnectJitter ¶ added in v0.2.0
WithReconnectJitter sets the upper bound for random delay added to ReconnectWait.
func WithReconnectJitterTLS ¶ added in v0.2.0
WithReconnectJitterTLS sets the upper bound for random delay added to ReconnectWait when TLS is used.
func WithReconnectWait ¶ added in v0.2.0
WithReconnectWait sets the time to backoff after attempting a reconnect.
func WithRequestCodec ¶ added in v0.2.0
WithRequestCodec sets the internal codec.
func WithRetryOnFailedConnect ¶ added in v0.2.0
WithRetryOnFailedConnect sets the connection in reconnecting state if it can't connect initially.
func WithServers ¶ added in v0.2.0
WithServers sets the list of NATS servers to connect to.
func WithSkipHostLookup ¶ added in v0.2.0
WithSkipHostLookup skips the DNS lookup for the server hostname.
func WithSubChanLen ¶ added in v0.2.0
WithSubChanLen sets the size of the buffered channel used for SyncSubscriptions.
func WithSyncPublish ¶ added in v0.2.0
WithSyncPublish configures whether to publish synchronously.
func WithTimeout ¶ added in v0.2.0
WithTimeout sets the timeout for a Dial operation on a connection.
func WithUseOldRequestStyle ¶ added in v0.2.0
WithUseOldRequestStyle forces the old method of Requests.
Types ¶
type Config ¶
type Config struct {
event.Config `yaml:",inline"`
NatsOptions `yaml:",inline"`
// RequestCodec configures the codec to use for encoding and decoding.
// Default: application/x-protobuf
RequestCodec string `json:"requestCodec,omitempty" yaml:"requestCodec,omitempty"`
// PublishCodec configures the codec to use for publishing.
// Default: application/json
PublishCodec string `json:"publishCodec,omitempty" yaml:"publishCodec,omitempty"`
// MaxConcurrent configures the maximum number of concurrent workers.
// Default: 10
MaxConcurrent int `json:"maxConcurrent,omitempty" yaml:"maxConcurrent,omitempty"`
// SyncPublish configures whether to publish synchronously.
// Default: false
SyncPublish bool `json:"syncPublish,omitempty" yaml:"syncPublish,omitempty"`
// DisableDurableStreams configures whether to disable durable streams.
// Default: false
DisableDurableStreams bool `json:"disableDurableStreams,omitempty" yaml:"disableDurableStreams,omitempty"`
}
Config provides configuration for the NATS event client.
func (*Config) ApplyOptions ¶
ApplyOptions applies a set of options to the config.
type NatsJS ¶
type NatsJS struct {
// contains filtered or unexported fields
}
NatsJS is the nats jetstream event client for go-orb.
func New ¶
New creates a new NATS registry. This functions should rarely be called manually. To create a new registry use Provide.
func (*NatsJS) GetPublishCodec ¶ added in v0.2.0
GetPublishCodec returns the codec used by the handler for publish.
func (*NatsJS) HandleRequest ¶
func (n *NatsJS) HandleRequest( ctx context.Context, topic string, callbackHandler func(context.Context, *event.Req[[]byte, []byte]), )
HandleRequest subscribes to the given topic and handles the requests.
func (*NatsJS) Publish ¶ added in v0.2.0
func (n *NatsJS) Publish(ctx context.Context, topic string, msg any, opts ...event.PublishOption) error
Publish a message to a topic.
type NatsOptions ¶ added in v0.2.0
type NatsOptions struct {
// URL represents a single NATS server url to which the client
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
URL string `json:"url,omitempty" yaml:"url,omitempty"`
// InProcessServer represents a NATS server running within the
// same process. If this is set then we will attempt to connect
// to the server directly rather than using external TCP conns.
InProcessServer nats.InProcessConnProvider `json:"-" yaml:"-"`
// Servers is a configured set of servers which this client
// will use when attempting to connect.
Servers []string `json:"servers,omitempty" yaml:"servers,omitempty"`
// NoRandomize configures whether we will randomize the
// server pool.
NoRandomize bool `json:"noRandomize,omitempty" yaml:"noRandomize,omitempty"`
// AllowReconnect enables reconnection logic to be used when we
// encounter a disconnect from the current server.
AllowReconnect bool `json:"allowReconnect,omitempty" yaml:"allowReconnect,omitempty"`
// MaxReconnect sets the number of reconnect attempts that will be
// tried before giving up. If negative, then it will never give up
// trying to reconnect.
// Defaults to 60.
MaxReconnect int `json:"maxReconnect,omitempty" yaml:"maxReconnect,omitempty"`
// ReconnectWait sets the time to backoff after attempting a reconnect
// to a server that we were already connected to previously.
// Defaults to 2s.
ReconnectWait config.Duration `json:"reconnectWait,omitempty" yaml:"reconnectWait,omitempty"`
// CustomReconnectDelayCB is invoked after the library tried every
// URL in the server list and failed to reconnect. It passes to the
// user the current number of attempts. This function returns the
// amount of time the library will sleep before attempting to reconnect
// again. It is strongly recommended that this value contains some
// jitter to prevent all connections to attempt reconnecting at the same time.
CustomReconnectDelayCB nats.ReconnectDelayHandler `json:"-" yaml:"-"`
// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Defaults to 100ms.
ReconnectJitter config.Duration `json:"reconnectJitter,omitempty" yaml:"reconnectJitter,omitempty"`
// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Defaults to 1s.
ReconnectJitterTLS config.Duration `json:"reconnectJitterTls,omitempty" yaml:"reconnectJitterTls,omitempty"`
// Timeout sets the timeout for a Dial operation on a connection.
// Defaults to 2s.
Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// DrainTimeout sets the timeout for a Drain Operation to complete.
// Defaults to 30s.
DrainTimeout config.Duration `json:"drainTimeout,omitempty" yaml:"drainTimeout,omitempty"`
// FlusherTimeout is the maximum time to wait for write operations
// to the underlying connection to complete (including the flusher loop).
// Defaults to 1m.
FlusherTimeout config.Duration `json:"flusherTimeout,omitempty" yaml:"flusherTimeout,omitempty"`
// PingInterval is the period at which the client will be sending ping
// commands to the server, disabled if 0 or negative.
// Defaults to 2m.
PingInterval config.Duration `json:"pingInterval,omitempty" yaml:"pingInterval,omitempty"`
// MaxPingsOut is the maximum number of pending ping commands that can
// be awaiting a response before raising an ErrStaleConnection error.
// Defaults to 2.
MaxPingsOut int `json:"maxPingsOut,omitempty" yaml:"maxPingsOut,omitempty"`
// ClosedCB sets the closed handler that is called when a client will
// no longer be connected.
ClosedCB nats.ConnHandler `json:"-" yaml:"-"`
// DisconnectedErrCB sets the disconnected error handler that is called
// whenever the connection is disconnected.
// Disconnected error could be nil, for instance when user explicitly closes the connection.
// DisconnectedCB will not be called if DisconnectedErrCB is set
DisconnectedErrCB nats.ConnErrHandler `json:"-" yaml:"-"`
// ConnectedCB sets the connected handler called when the initial connection
// is established. It is not invoked on successful reconnects - for reconnections,
// use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
// to detect whether the initial connect was successful.
ConnectedCB nats.ConnHandler `json:"-" yaml:"-"`
// ReconnectedCB sets the reconnected handler called whenever
// the connection is successfully reconnected.
ReconnectedCB nats.ConnHandler `json:"-" yaml:"-"`
// DiscoveredServersCB sets the callback that is invoked whenever a new
// server has joined the cluster.
DiscoveredServersCB nats.ConnHandler `json:"-" yaml:"-"`
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
AsyncErrorCB nats.ErrHandler `json:"-" yaml:"-"`
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
// Defaults to 8388608 bytes (8MB).
ReconnectBufSize int `json:"reconnectBufSize,omitempty" yaml:"reconnectBufSize,omitempty"`
// SubChanLen is the size of the buffered channel used between the socket
// Go routine and the message delivery for SyncSubscriptions.
// NOTE: This does not affect AsyncSubscriptions which are
// dictated by PendingLimits()
// Defaults to 65536.
SubChanLen int `json:"subChanLen,omitempty" yaml:"subChanLen,omitempty"`
// UserJWT sets the callback handler that will fetch a user's JWT.
UserJWT nats.UserJWTHandler `json:"-" yaml:"-"`
// Nkey sets the public nkey that will be used to authenticate
// when connecting to the server. UserJWT and Nkey are mutually exclusive
// and if defined, UserJWT will take precedence.
Nkey string `json:"nkey,omitempty" yaml:"nkey,omitempty"`
// SignatureCB designates the function used to sign the nonce
// presented from the server.
SignatureCB nats.SignatureHandler `json:"-" yaml:"-"`
// User sets the username to be used when connecting to the server.
User string `json:"user,omitempty" yaml:"user,omitempty"`
// Password sets the password to be used when connecting to a server.
Password string `json:"password,omitempty" yaml:"password,omitempty"`
// Token sets the token to be used when connecting to a server.
Token string `json:"token,omitempty" yaml:"token,omitempty"`
// TokenHandler designates the function used to generate the token to be used when connecting to a server.
TokenHandler nats.AuthTokenHandler `json:"-" yaml:"-"`
// CustomDialer allows to specify a custom dialer (not necessarily
// a *net.Dialer).
CustomDialer nats.CustomDialer `json:"-" yaml:"-"`
// UseOldRequestStyle forces the old method of Requests that utilize
// a new Inbox and a new Subscription for each request.
UseOldRequestStyle bool `json:"useOldRequestStyle,omitempty" yaml:"useOldRequestStyle,omitempty"`
// NoCallbacksAfterClientClose allows preventing the invocation of
// callbacks after Close() is called. Client won't receive notifications
// when Close is invoked by user code. Default is to invoke the callbacks.
NoCallbacksAfterClientClose bool `json:"noCallbacksAfterClientClose,omitempty" yaml:"noCallbacksAfterClientClose,omitempty"`
// LameDuckModeHandler sets the callback to invoke when the server notifies
// the connection that it entered lame duck mode, that is, going to
// gradually disconnect all its connections before shutting down. This is
// often used in deployments when upgrading NATS Servers.
LameDuckModeHandler nats.ConnHandler `json:"-" yaml:"-"`
// RetryOnFailedConnect sets the connection in reconnecting state right
// away if it can't connect to a server in the initial set. The
// MaxReconnect and ReconnectWait options are used for this process,
// similarly to when an established connection is disconnected.
// If a ReconnectHandler is set, it will be invoked on the first
// successful reconnect attempt (if the initial connect fails),
// and if a ClosedHandler is set, it will be invoked if
// it fails to connect (after exhausting the MaxReconnect attempts).
RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty" yaml:"retryOnFailedConnect,omitempty"`
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool `json:"compression,omitempty" yaml:"compression,omitempty"`
// For websocket connections, adds a path to connections url.
// This is useful when connecting to NATS behind a proxy.
ProxyPath string `json:"proxyPath,omitempty" yaml:"proxyPath,omitempty"`
// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string `json:"inboxPrefix,omitempty" yaml:"inboxPrefix,omitempty"`
// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
IgnoreAuthErrorAbort bool `json:"ignoreAuthErrorAbort,omitempty" yaml:"ignoreAuthErrorAbort,omitempty"`
// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool `json:"skipHostLookup,omitempty" yaml:"skipHostLookup,omitempty"`
}
NatsOptions can be used to create a customized connection.
func (NatsOptions) ToOptions ¶ added in v0.2.0
func (o NatsOptions) ToOptions() nats.Options
ToOptions converts the NatsOptions to nats.Options.