natsmq

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultAckHandler

func DefaultAckHandler(nuid string, err error)

func NewConn

func NewConn(ctx context.Context, brokersAddr string, opts ...nats.Option) (*nats.Conn, error)

func NewEncodedConn

func NewEncodedConn(ctx context.Context, brokersAddr string, opts ...nats.Option) (*nats.EncodedConn, error)

func NewStreamConn

func NewStreamConn(clusterId, clientId string, opts ...stan.Option) (stan.Conn, error)

Types

type AckManager

type AckManager struct {
	Dump    *storages.Dump
	Version int64
	Lock    *sync.RWMutex
	Rand    *rand.Rand
	// contains filtered or unexported fields
}

func NewAckTimestampManager

func NewAckTimestampManager(dumpFileName string, flushDelay time.Duration) *AckManager

func (*AckManager) Flush

func (a *AckManager) Flush()

func (*AckManager) Get

func (a *AckManager) Get() uint64

func (*AckManager) OnFlushComplete

func (a *AckManager) OnFlushComplete()

func (*AckManager) Set

func (a *AckManager) Set(sequence uint64)

type NatsConn

type NatsConn struct {
	Conn   *nats.Conn
	Tracer opentracing.Tracer
	Logger *zap.SugaredLogger
}

type NatsEncodedConn

type NatsEncodedConn struct {
	Conn   *nats.EncodedConn
	Tracer opentracing.Tracer
	Logger *zap.SugaredLogger
}

type NatsSub

type NatsSub struct {
	Tracer opentracing.Tracer
	Logger *zap.SugaredLogger
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(ctx context.Context, lg *zap.SugaredLogger, brokerAddr, subject string, opts ...nats.Option) (*NatsSub, error)

func NewSubscriptionWithTracing

func NewSubscriptionWithTracing(ctx context.Context, lg *zap.SugaredLogger, tracer opentracing.Tracer, brokerAddr, subject string, opts ...nats.Option) (*NatsSub, error)

func (*NatsSub) Errors

func (ns *NatsSub) Errors() <-chan error

func (*NatsSub) Messages

func (ns *NatsSub) Messages() <-chan *nats.Msg

func (*NatsSub) StartConsumption

func (ns *NatsSub) StartConsumption(ctx context.Context, handler func(data []byte) error)

func (*NatsSub) Stop

func (ns *NatsSub) Stop() error

type StanConn

type StanConn struct {
	Tracer opentracing.Tracer
	// contains filtered or unexported fields
}

func NewStanConn

func NewStanConn(ctx context.Context, lg *zap.SugaredLogger, clusterId, brokerAddr, clientId string, opts ...nats.Option) (*StanConn, error)

func (*StanConn) DefaultAckHandler added in v0.1.5

func (sc *StanConn) DefaultAckHandler(nuid string, err error)

func (*StanConn) SendAsyncMessage

func (sc *StanConn) SendAsyncMessage(topic string, data interface{}, handler stan.AckHandler)

func (*StanConn) SendMessage

func (sc *StanConn) SendMessage(topic string, data interface{})

func (*StanConn) Stop

func (sc *StanConn) Stop()

type StanSub

type StanSub struct {
	Tracer opentracing.Tracer
	Logger *zap.SugaredLogger
	// contains filtered or unexported fields
}

func NewChanSub

func NewChanSub(ctx context.Context, lg *zap.SugaredLogger, storageDirPath, clusterId, brokerAddr, channel string, opts ...nats.Option) (*StanSub, error)

NewChanSub creates connection with channel-named clientID creating subscription with a whole service lifetime context

func NewChanSubWithTracer

func NewChanSubWithTracer(ctx context.Context, lg *zap.SugaredLogger, tracer opentracing.Tracer, storageDirPath, clusterId, channel, brokerAddr string, opts ...nats.Option) (*StanSub, error)

func (*StanSub) Errors

func (s *StanSub) Errors() <-chan error

func (*StanSub) Messages

func (s *StanSub) Messages() <-chan *stan.Msg

func (*StanSub) StartConsumption

func (s *StanSub) StartConsumption(ctx context.Context, handler func(data []byte) error)

func (*StanSub) Stop

func (s *StanSub) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL