Documentation
¶
Overview ¶
Package jetstream provides interface and two implementations to connect Nats JetStream.
Function NewDefaultJetStreamClient(url string, opts ...nats.Option) returns a client with default implementation, which relies on the input url and other nats connection options.
Function NewInClusterJetStreamClient() assumes the invoker is in a Kubernetes cluster, and there are several environment variables are available, which are used to connect to the Nats JetStream server. Those environment variables include:
NUMAFLOW_ISBSVC_JETSTREAM_URL, NUMAFLOW_ISBSVC_JETSTREAM_USER, NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD, NUMAFLOW_ISBSVC_JETSTREAM_TLS_ENABLED (optional)
When using InClusterJetStreamClient, it has ability to auto reconnect if corresponding parameter is enabled in function Connect().
For example:
client.Connect(ctx, AutoReconnect())
Index ¶
- func NewDefaultJetStreamClient(url string, opts ...nats.Option) *defaultJetStreamClient
- func NewInClusterJetStreamClient() *inClusterJetStreamClient
- type JetStreamClient
- type JetStreamClientOption
- type JetStreamContext
- func (jsc *JetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
- func (jsc *JetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
- func (jsc *JetStreamContext) ConsumerInfo(stream string, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
- func (jsc *JetStreamContext) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error)
- func (jsc *JetStreamContext) DeleteConsumer(stream string, consumer string, opts ...nats.JSOpt) error
- func (jsc *JetStreamContext) DeleteKeyValue(bucket string) error
- func (jsc *JetStreamContext) DeleteStream(name string, opts ...nats.JSOpt) error
- func (jsc *JetStreamContext) KeyValue(bucket string) (nats.KeyValue, error)
- func (jsc *JetStreamContext) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error)
- func (jsc *JetStreamContext) PullSubscribe(subj string, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (jsc *JetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error)
- type NatsConn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDefaultJetStreamClient ¶
NewDefaultJetStreamClient is used to get a default JetStream client instance
func NewInClusterJetStreamClient ¶
func NewInClusterJetStreamClient() *inClusterJetStreamClient
NewInClusterJetStreamClient return an instance of inClusterJetStreamClient
Types ¶
type JetStreamClient ¶
type JetStreamClient interface {
Connect(ctx context.Context, opts ...JetStreamClientOption) (*NatsConn, error)
}
JetStreamClient is used to provide a jetstream client
type JetStreamClientOption ¶
type JetStreamClientOption func(*jsClientOptions)
func ConnectionCheckInterval ¶
func ConnectionCheckInterval(d time.Duration) JetStreamClientOption
ConnectionCheckInterval is an Option to set connection check interval.
func DisconnectErrHandler ¶
func DisconnectErrHandler(f func(*NatsConn, error)) JetStreamClientOption
DisconnectErrHandler is an option to set disconnect handler.
func NoReconnect ¶
func NoReconnect() JetStreamClientOption
NoReconnect is an Option to set no auto reconnect.
func ReconnectHandler ¶
func ReconnectHandler(f func(*NatsConn)) JetStreamClientOption
ReconnectHandler is an Option to set reconnect handler.
type JetStreamContext ¶
type JetStreamContext struct {
// contains filtered or unexported fields
}
JetStreamContext is a proxy struct to nats.JetStreamContext The existence of this proxy is to replace underlying nats.JetStreamContext with new one after reconnection.
func (*JetStreamContext) AddConsumer ¶
func (jsc *JetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
func (*JetStreamContext) AddStream ¶
func (jsc *JetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
func (*JetStreamContext) ConsumerInfo ¶
func (jsc *JetStreamContext) ConsumerInfo(stream string, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
func (*JetStreamContext) CreateKeyValue ¶
func (jsc *JetStreamContext) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error)
func (*JetStreamContext) DeleteConsumer ¶
func (*JetStreamContext) DeleteKeyValue ¶
func (jsc *JetStreamContext) DeleteKeyValue(bucket string) error
func (*JetStreamContext) DeleteStream ¶
func (jsc *JetStreamContext) DeleteStream(name string, opts ...nats.JSOpt) error
func (*JetStreamContext) KeyValue ¶
func (jsc *JetStreamContext) KeyValue(bucket string) (nats.KeyValue, error)
func (*JetStreamContext) PublishMsgAsync ¶
func (jsc *JetStreamContext) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error)
func (*JetStreamContext) PullSubscribe ¶
func (jsc *JetStreamContext) PullSubscribe(subj string, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
func (*JetStreamContext) StreamInfo ¶
func (jsc *JetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error)
type NatsConn ¶
NatsConn is a wrapper of nats.Conn, which implements our own magic for auto reconnection.
func NewNatsConn ¶
NewNatsConn returns a NatsConn instance
func (*NatsConn) Close ¶
func (nc *NatsConn) Close()
Close function closes the underlying Nats connection.
func (*NatsConn) IsConnected ¶
IsConnected function implements the magic to check if the connection is OK. It utilize the dedicated JetStreamContext to call AccountInfo() function, and check if it works for determination. To reduce occasionality, it checks 3 times if there's a failure.