Documentation
¶
Overview ¶
Package jsm provides client helpers for managing and interacting with NATS JetStream
Index ¶
- Constants
- Variables
- func APISubject(subject string, prefix string, domain string) string
- func DirectSubject(stream string) (string, error)
- func EventSubject(subject string, prefix string) string
- func FilterServerMetadata(metadata map[string]string) map[string]string
- func IsErrorResponse(m *nats.Msg) bool
- func IsInternalStream(s string) bool
- func IsKVBucketStream(s string) bool
- func IsMQTTStateStream(s string) bool
- func IsNatsError(err error, code uint16) bool
- func IsOKResponse(m *nats.Msg) bool
- func IsObjectBucketStream(s string) bool
- func IsValidName(n string) bool
- func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]time.Duration, error)
- func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
- func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func NextSubject(stream string, consumer string) (string, error)
- func ParseDuration(d string) (time.Duration, error)
- func ParseErrorResponse(m *nats.Msg) error
- func ParseEvent(e []byte) (schema string, event any, err error)
- func ParsePubAck(m *nats.Msg) (*api.PubAck, error)
- func ServerCidString(kind int, id uint64) string
- func ServerKindString(kind int) string
- func SubjectIsSubsetMatch(subject, test string) bool
- type Consumer
- func (c *Consumer) AckPolicy() api.AckPolicy
- func (c *Consumer) AckSampleSubject() string
- func (c *Consumer) AckWait() time.Duration
- func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
- func (c *Consumer) AdvisorySubject() string
- func (c *Consumer) Backoff() []time.Duration
- func (c *Consumer) ClusterInfo() (api.ClusterInfo, error)
- func (c *Consumer) Configuration() (config api.ConsumerConfig)
- func (c *Consumer) Delete() (err error)
- func (c *Consumer) DeliverGroup() string
- func (c *Consumer) DeliverPolicy() api.DeliverPolicy
- func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
- func (c *Consumer) DeliverySubject() string
- func (c *Consumer) Description() string
- func (c *Consumer) DurableName() string
- func (c *Consumer) FilterSubject() string
- func (c *Consumer) FilterSubjects() []string
- func (c *Consumer) FlowControl() bool
- func (c *Consumer) Heartbeat() time.Duration
- func (c *Consumer) InactiveThreshold() time.Duration
- func (c *Consumer) IsDurable() bool
- func (c *Consumer) IsEphemeral() bool
- func (c *Consumer) IsHeadersOnly() bool
- func (c *Consumer) IsOverflowPriority() bool
- func (c *Consumer) IsPinnedClientPriority() bool
- func (c *Consumer) IsPullMode() bool
- func (c *Consumer) IsPushMode() bool
- func (c *Consumer) IsSampled() bool
- func (c *Consumer) LatestState() (api.ConsumerInfo, error)
- func (c *Consumer) LeaderStepDown(placement ...*api.Placement) error
- func (c *Consumer) MaxAckPending() int
- func (c *Consumer) MaxDeliver() int
- func (c *Consumer) MaxRequestBatch() int
- func (c *Consumer) MaxRequestExpires() time.Duration
- func (c *Consumer) MaxRequestMaxBytes() int
- func (c *Consumer) MaxWaiting() int
- func (c *Consumer) MemoryStorage() bool
- func (c *Consumer) Metadata() map[string]string
- func (c *Consumer) MetricSubject() string
- func (c *Consumer) Name() string
- func (c *Consumer) NextMsg() (*nats.Msg, error)
- func (c *Consumer) NextMsgContext(ctx context.Context) (*nats.Msg, error)
- func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
- func (c *Consumer) NextSubject() string
- func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, error)
- func (c *Consumer) PauseUntil() time.Time
- func (c *Consumer) PendingAcknowledgement() (int, error)
- func (c *Consumer) PendingMessages() (uint64, error)
- func (c *Consumer) PriorityGroups() []string
- func (c *Consumer) PriorityPolicy() api.PriorityPolicy
- func (c *Consumer) RateLimit() uint64
- func (c *Consumer) RedeliveryCount() (int, error)
- func (c *Consumer) ReplayPolicy() api.ReplayPolicy
- func (c *Consumer) Replicas() int
- func (c *Consumer) Reset() error
- func (c *Consumer) Resume() error
- func (c *Consumer) SampleFrequency() string
- func (c *Consumer) StartSequence() uint64
- func (c *Consumer) StartTime() time.Time
- func (c *Consumer) State() (api.ConsumerInfo, error)
- func (c *Consumer) StreamName() string
- func (c *Consumer) Unpin(group string) error
- func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
- func (c *Consumer) WaitingClientPulls() (int, error)
- type ConsumerOption
- func AckWait(t time.Duration) ConsumerOption
- func AcknowledgeAll() ConsumerOption
- func AcknowledgeExplicit() ConsumerOption
- func AcknowledgeNone() ConsumerOption
- func BackoffIntervals(i ...time.Duration) ConsumerOption
- func BackoffPolicy(policy []time.Duration) ConsumerOption
- func ConsumerDescription(d string) ConsumerOption
- func ConsumerMetadata(meta map[string]string) ConsumerOption
- func ConsumerName(s string) ConsumerOption
- func ConsumerOverrideMemoryStorage() ConsumerOption
- func ConsumerOverrideReplicas(r int) ConsumerOption
- func DeliverAllAvailable() ConsumerOption
- func DeliverBodies() ConsumerOption
- func DeliverGroup(g string) ConsumerOption
- func DeliverHeadersOnly() ConsumerOption
- func DeliverLastPerSubject() ConsumerOption
- func DeliverySubject(s string) ConsumerOption
- func DurableName(s string) ConsumerOption
- func FilterStreamBySubject(s ...string) ConsumerOption
- func IdleHeartbeat(hb time.Duration) ConsumerOption
- func InactiveThreshold(t time.Duration) ConsumerOption
- func LinearBackoffPolicy(steps uint, min time.Duration, max time.Duration) ConsumerOption
- func MaxAckPending(pending uint) ConsumerOption
- func MaxDeliveryAttempts(n int) ConsumerOption
- func MaxRequestBatch(max uint) ConsumerOption
- func MaxRequestExpires(max time.Duration) ConsumerOption
- func MaxRequestMaxBytes(max int) ConsumerOption
- func MaxWaiting(pulls uint) ConsumerOption
- func OverflowPriorityGroups(groups ...string) ConsumerOption
- func PauseUntil(deadline time.Time) ConsumerOption
- func PinnedClientPriorityGroups(ttl time.Duration, groups ...string) ConsumerOption
- func PrioritizedPriorityGroups(groups ...string) ConsumerOption
- func PushFlowControl() ConsumerOption
- func RateLimitBitsPerSecond(bps uint64) ConsumerOption
- func ReplayAsReceived() ConsumerOption
- func ReplayInstantly() ConsumerOption
- func SamplePercent(i int) ConsumerOption
- func StartAtSequence(s uint64) ConsumerOption
- func StartAtTime(t time.Time) ConsumerOption
- func StartAtTimeDelta(d time.Duration) ConsumerOption
- func StartWithLastReceived() ConsumerOption
- func StartWithNextReceived() ConsumerOption
- type ConsumerQueryOpt
- func ConsumerQueryApiLevelMin(level int) ConsumerQueryOpt
- func ConsumerQueryExpression(e string) ConsumerQueryOpt
- func ConsumerQueryInvert() ConsumerQueryOpt
- func ConsumerQueryIsBound() ConsumerQueryOpt
- func ConsumerQueryIsPinned() ConsumerQueryOpt
- func ConsumerQueryIsPull() ConsumerQueryOpt
- func ConsumerQueryIsPush() ConsumerQueryOpt
- func ConsumerQueryLeaderServer(server string) ConsumerQueryOpt
- func ConsumerQueryOlderThan(age time.Duration) ConsumerQueryOpt
- func ConsumerQueryReplicas(r uint) ConsumerQueryOpt
- func ConsumerQueryWithDeliverySince(age time.Duration) ConsumerQueryOpt
- func ConsumerQueryWithFewerAckPending(pending int) ConsumerQueryOpt
- func ConsumerQueryWithFewerPending(pending uint64) ConsumerQueryOpt
- func ConsumerQueryWithFewerWaiting(waiting int) ConsumerQueryOpt
- type Manager
- func (m *Manager) ConsumerNames(stream string) (names []string, err error)
- func (m *Manager) Consumers(stream string) (consumers []*Consumer, missing []string, offline map[string]string, err error)
- func (m *Manager) DeleteConsumer(stream string, consumer string) error
- func (m *Manager) DeleteStream(stream string) error
- func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error
- func (m *Manager) EachStream(filter *StreamNamesFilter, cb func(*Stream)) (missing []string, offline map[string]string, err error)
- func (m *Manager) IsJetStreamEnabled() bool
- func (m *Manager) IsKnownConsumer(stream string, consumer string) (bool, error)
- func (m *Manager) IsKnownStream(stream string) (bool, error)
- func (m *Manager) IsPedantic() bool
- func (m *Manager) IsStreamMaxBytesRequired() (bool, error)
- func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
- func (m *Manager) LoadConsumer(stream string, name string) (consumer *Consumer, err error)
- func (m *Manager) LoadFromStreamDetailBytes(sd []byte) (stream *Stream, consumers []*Consumer, err error)
- func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, ...) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadStream(name string) (stream *Stream, err error)
- func (m *Manager) MetaApiLevel(refresh bool) (int, error)
- func (m *Manager) MetaLeaderStandDown(placement *api.Placement) error
- func (m *Manager) MetaPeerRemove(name string, id string) error
- func (m *Manager) MetaPurgeAccount(account string) error
- func (m *Manager) NatsConn() *nats.Conn
- func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NextMsg(stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, ...) error
- func (m *Manager) NextSubject(stream string, consumer string) (string, error)
- func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error)
- func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
- func (m *Manager) RestoreSnapshotFromBuffer(ctx context.Context, stream string, dataReader, metadataReader io.ReadCloser, ...) (*api.StreamState, error)
- func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
- func (m *Manager) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)
- func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
- func (m *Manager) Streams(filter *StreamNamesFilter) (streams []*Stream, missing []string, offline map[string]string, err error)
- type MsgInfo
- type Option
- type PagerOption
- type RestoreProgress
- type SnapshotOption
- func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
- func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
- func SnapshotChunkSize(sz int) SnapshotOption
- func SnapshotConsumers() SnapshotOption
- func SnapshotDebug() SnapshotOption
- func SnapshotHealthCheck() SnapshotOption
- func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
- func WithProgress(sz int) SnapshotOption
- type SnapshotProgress
- type Stream
- func (s *Stream) AdvisorySubject() string
- func (s *Stream) AllowMsgTTL() bool
- func (s *Stream) AtomicBatchPublishAllowed() bool
- func (s *Stream) ClusterInfo() (api.ClusterInfo, error)
- func (s *Stream) Compression() api.Compression
- func (s *Stream) Configuration() api.StreamConfig
- func (s *Stream) ConsumerLimits() api.StreamConsumerLimits
- func (s *Stream) ConsumerNames() (names []string, err error)
- func (s *Stream) ContainedSubjects(filter ...string) (map[string]uint64, error)
- func (s *Stream) CounterAllowed() bool
- func (s *Stream) Delete() error
- func (s *Stream) DeleteAllowed() bool
- func (s *Stream) DeleteMessage(seq uint64) (err error)deprecated
- func (s *Stream) DeleteMessageRequest(req api.JSApiMsgDeleteRequest) (err error)
- func (s *Stream) Description() string
- func (s *Stream) DetectGaps(ctx context.Context, progress func(seq uint64, pending uint64), ...) error
- func (s *Stream) DirectAllowed() bool
- func (s *Stream) DirectGet(ctx context.Context, req api.JSApiMsgGetRequest, handler func(msg *nats.Msg)) (numPending uint64, lastSeq uint64, upToSeq uint64, err error)
- func (s *Stream) DirectSubject() string
- func (s *Stream) DiscardNewPerSubject() bool
- func (s *Stream) DiscardPolicy() api.DiscardPolicy
- func (s *Stream) DuplicateWindow() time.Duration
- func (s *Stream) EachConsumer(cb func(consumer *Consumer)) (missing []string, offline map[string]string, err error)
- func (s *Stream) FastDeleteMessage(seq uint64) errordeprecated
- func (s *Stream) FirstSequence() uint64
- func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error)
- func (s *Stream) IsCompressed() bool
- func (s *Stream) IsInternal() bool
- func (s *Stream) IsKVBucket() bool
- func (s *Stream) IsMQTTState() bool
- func (s *Stream) IsMirror() bool
- func (s *Stream) IsObjectBucket() bool
- func (s *Stream) IsRepublishing() bool
- func (s *Stream) IsSourced() bool
- func (s *Stream) IsTemplateManaged() bool
- func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
- func (s *Stream) LatestState() (state api.StreamState, err error)
- func (s *Stream) LeaderStepDown(placement ...*api.Placement) error
- func (s *Stream) LoadConsumer(name string) (*Consumer, error)
- func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) MaxAge() time.Duration
- func (s *Stream) MaxBytes() int64
- func (s *Stream) MaxConsumers() int
- func (s *Stream) MaxMsgSize() int32
- func (s *Stream) MaxMsgs() int64
- func (s *Stream) MaxMsgsPerSubject() int64
- func (s *Stream) Metadata() map[string]string
- func (s *Stream) MetricSubject() string
- func (s *Stream) Mirror() *api.StreamSource
- func (s *Stream) MirrorDirectAllowed() bool
- func (s *Stream) Name() string
- func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NoAck() bool
- func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
- func (s *Stream) PersistenceMode() api.PersistModeType
- func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
- func (s *Stream) PurgeAllowed() bool
- func (s *Stream) QueryConsumers(opts ...ConsumerQueryOpt) ([]*Consumer, error)
- func (s *Stream) ReadLastMessageForSubject(subj string) (*api.StoredMsg, error)
- func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error)
- func (s *Stream) RemoveRAFTPeer(peer string) error
- func (s *Stream) Replicas() int
- func (s *Stream) Republish() *api.RePublish
- func (s *Stream) Reset() error
- func (s *Stream) Retention() api.RetentionPolicy
- func (s *Stream) RollupAllowed() bool
- func (s *Stream) SchedulesAllowed() bool
- func (s *Stream) Seal() error
- func (s *Stream) Sealed() bool
- func (s *Stream) SnapshotToBuffer(ctx context.Context, dataBuffer, metadataBuffer io.WriteCloser, ...) (SnapshotProgress, error)
- func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
- func (s *Stream) Sources() []*api.StreamSource
- func (s *Stream) State(req ...api.JSApiStreamInfoRequest) (stats api.StreamState, err error)
- func (s *Stream) Storage() api.StorageType
- func (s *Stream) SubjectDeleteMarkerTTL() time.Duration
- func (s *Stream) Subjects() []string
- func (s *Stream) Template() string
- func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
- type StreamNamesFilter
- type StreamOption
- func AllowAtomicBatchPublish() StreamOption
- func AllowCounter() StreamOption
- func AllowDirect() StreamOption
- func AllowMsgTTL() StreamOption
- func AllowRollup() StreamOption
- func AllowSchedules() StreamOption
- func AppendSource(source *api.StreamSource) StreamOption
- func AsyncPersistence() StreamOption
- func Compression(alg api.Compression) StreamOption
- func ConsumerLimits(limits api.StreamConsumerLimits) StreamOption
- func DenyDelete() StreamOption
- func DenyPurge() StreamOption
- func DiscardNew() StreamOption
- func DiscardNewPerSubject() StreamOption
- func DiscardOld() StreamOption
- func DuplicateWindow(d time.Duration) StreamOption
- func FileStorage() StreamOption
- func FirstSequence(seq uint64) StreamOption
- func InterestRetention() StreamOption
- func LimitsRetention() StreamOption
- func MaxAge(m time.Duration) StreamOption
- func MaxBytes(m int64) StreamOption
- func MaxConsumers(m int) StreamOption
- func MaxMessageSize(m int32) StreamOption
- func MaxMessages(m int64) StreamOption
- func MaxMessagesPerSubject(m int64) StreamOption
- func MemoryStorage() StreamOption
- func Mirror(stream *api.StreamSource) StreamOption
- func MirrorDirect() StreamOption
- func NoAck() StreamOption
- func NoAllowAtomicBatchPublish() StreamOption
- func NoAllowCounter() StreamOption
- func NoAllowDirect() StreamOption
- func NoMirrorDirect() StreamOption
- func PlacementCluster(cluster string) StreamOption
- func PlacementPreferredLeader(leader string) StreamOption
- func PlacementTags(tags ...string) StreamOption
- func Replicas(r int) StreamOption
- func Republish(m *api.RePublish) StreamOption
- func Sources(streams ...*api.StreamSource) StreamOption
- func StreamDescription(d string) StreamOption
- func StreamMetadata(meta map[string]string) StreamOption
- func SubjectDeleteMarkerTTL(d time.Duration) StreamOption
- func SubjectTransform(subjectTransform *api.SubjectTransformConfig) StreamOption
- func Subjects(s ...string) StreamOption
- func WorkQueueRetention() StreamOption
- type StreamPager
- type StreamQueryOpt
- func StreamQueryApiLevelMin(level int) StreamQueryOpt
- func StreamQueryClusterName(c string) StreamQueryOpt
- func StreamQueryExpression(e string) StreamQueryOpt
- func StreamQueryFewerConsumersThan(c uint) StreamQueryOpt
- func StreamQueryIdleLongerThan(p time.Duration) StreamQueryOpt
- func StreamQueryInvert() StreamQueryOpt
- func StreamQueryIsMirror() StreamQueryOpt
- func StreamQueryIsSourced() StreamQueryOpt
- func StreamQueryLeaderServer(server string) StreamQueryOpt
- func StreamQueryOlderThan(p time.Duration) StreamQueryOpt
- func StreamQueryReplicas(r uint) StreamQueryOpt
- func StreamQueryServerName(s string) StreamQueryOpt
- func StreamQuerySubjectWildcard(s string) StreamQueryOpt
- func StreamQueryWithoutMessages() StreamQueryOpt
Constants ¶
const ( StatusHdr string = "Status" DescriptionHdr string = "Description" )
Variables ¶
var DefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, }
DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
var DefaultStream = api.StreamConfig{ Retention: api.LimitsPolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: 1, NoAck: false, }
DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age. No storage type or subjects are set
var DefaultStreamConfiguration = DefaultStream
DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream
var DefaultWorkQueue = api.StreamConfig{ Retention: api.WorkQueuePolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: api.StreamDefaultReplicas, NoAck: false, }
DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age. No storage type or subjects are set
var ErrMemoryStreamNotSupported error = errors.New("memory streams do not support snapshots")
ErrMemoryStreamNotSupported is an error indicating a memory stream was being snapshotted which is not supported
var ErrNoExprLangBuild = fmt.Errorf("binary has been built with `noexprlang` build tag and thus does not support expression matching")
ErrNoExprLangBuild warns that expression matching is disabled when compiling a go binary with the `noexprlang` build tag.
var SampledDefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, SampleFrequency: "100%", }
SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
Functions ¶
func APISubject ¶ added in v0.0.21
APISubject returns API subject with prefix applied
func DirectSubject ¶ added in v0.2.0
func EventSubject ¶ added in v0.0.21
EventSubject returns Event subject with prefix applied
func FilterServerMetadata ¶ added in v0.2.0
FilterServerMetadata copies metadata with the server generated metadata removed
func IsErrorResponse ¶
IsErrorResponse checks if the message holds a standard JetStream error
func IsInternalStream ¶ added in v0.0.27
IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func IsKVBucketStream ¶ added in v0.0.27
IsKVBucketStream determines if a stream is a KV bucket
func IsMQTTStateStream ¶ added in v0.0.27
IsMQTTStateStream determines if a stream holds internal MQTT state
func IsNatsError ¶ added in v0.0.25
IsNatsError checks if err is a ApiErr matching code
func IsOKResponse ¶
IsOKResponse checks if the message holds a standard JetStream error
func IsObjectBucketStream ¶ added in v0.0.27
IsObjectBucketStream determines if a stream is a Object bucket
func IsValidName ¶ added in v0.0.18
IsValidName verifies if n is a valid stream, template or consumer name
func LinearBackoffPeriods ¶ added in v0.0.29
func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]time.Duration, error)
LinearBackoffPeriods creates a backoff policy without any jitter suitable for use in a consumer backoff policy
The periods start from min and increase linearly until ~max
func NewConsumerConfiguration ¶
func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
NewConsumerConfiguration generates a new configuration based on template modified by opts
func NewStreamConfiguration ¶
func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func ParseDuration ¶ added in v0.2.0
ParseDuration parse durations with additional units over those from standard go parser.
In addition to normal go parser time units it also supports these.
The reason these are not in go standard lib is due to precision around how many days in a month and about leap years and leap seconds. This function does nothing to try and correct for those.
* "w", "W" - a week based on 7 days of exactly 24 hours * "d", "D" - a day based on 24 hours * "M" - a month made of 30 days of 24 hours * "y", "Y" - a year made of 365 days of 24 hours each
Valid duration strings can be -1y1d1µs
func ParseErrorResponse ¶
ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil
func ParseEvent ¶
ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage
func ParsePubAck ¶ added in v0.0.25
ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed
func ServerCidString ¶ added in v0.2.0
ServerCidString takes a kind like server.CLIENT a similar cid like the server would, eg cid:10
func ServerKindString ¶ added in v0.2.0
ServerKindString takes a kind like server.CLIENT and returns a string describing it
func SubjectIsSubsetMatch ¶ added in v0.0.33
SubjectIsSubsetMatch tests if a subject matches a standard nats wildcard
Types ¶
type Consumer ¶
Consumer represents a JetStream consumer
func (*Consumer) AckSampleSubject ¶
AckSampleSubject is the subject used to publish ack samples to
func (*Consumer) AcknowledgedFloor ¶
func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
AcknowledgedFloor reports the highest contiguous message sequences that were acknowledged
func (*Consumer) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this consumer
func (*Consumer) ClusterInfo ¶ added in v0.2.0
func (c *Consumer) ClusterInfo() (api.ClusterInfo, error)
func (*Consumer) Configuration ¶
func (c *Consumer) Configuration() (config api.ConsumerConfig)
Configuration is the Consumer configuration
func (*Consumer) Delete ¶
Delete deletes the Consumer, after this the Consumer object should be disposed
func (*Consumer) DeliverGroup ¶ added in v0.0.26
func (*Consumer) DeliverPolicy ¶
func (c *Consumer) DeliverPolicy() api.DeliverPolicy
func (*Consumer) DeliveredState ¶
func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
DeliveredState reports the messages sequences that were successfully delivered
func (*Consumer) DeliverySubject ¶
func (*Consumer) Description ¶ added in v0.0.26
func (*Consumer) DurableName ¶
func (*Consumer) FilterSubject ¶
func (*Consumer) FilterSubjects ¶ added in v0.1.0
func (*Consumer) FlowControl ¶ added in v0.0.21
func (*Consumer) InactiveThreshold ¶ added in v0.0.29
func (*Consumer) IsEphemeral ¶
func (*Consumer) IsHeadersOnly ¶ added in v0.0.27
func (*Consumer) IsOverflowPriority ¶ added in v0.2.0
func (*Consumer) IsPinnedClientPriority ¶ added in v0.2.0
func (*Consumer) IsPullMode ¶
func (*Consumer) IsPushMode ¶
func (*Consumer) LatestState ¶ added in v0.0.23
func (c *Consumer) LatestState() (api.ConsumerInfo, error)
LatestState returns the most recently loaded state
func (*Consumer) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election, the election of the next leader can be influenced by placement
func (*Consumer) MaxAckPending ¶ added in v0.0.20
func (*Consumer) MaxDeliver ¶
func (*Consumer) MaxRequestBatch ¶ added in v0.0.29
func (*Consumer) MaxRequestExpires ¶ added in v0.0.29
func (*Consumer) MaxRequestMaxBytes ¶ added in v0.0.33
func (*Consumer) MaxWaiting ¶ added in v0.0.24
func (*Consumer) MemoryStorage ¶ added in v0.0.33
func (*Consumer) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all metrics for this consumer
func (*Consumer) NextMsg ¶
NextMsg retrieves the next message, waiting up to manager timeout for a response
func (*Consumer) NextMsgContext ¶ added in v0.0.19
NextMsgContext retrieves the next message, interrupted by the cancel context ctx
func (*Consumer) NextMsgRequest ¶ added in v0.0.20
func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages, data or control flow messages will be sent to inbox
func (*Consumer) NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Consumer) Pause ¶ added in v0.2.0
Pause requests a consumer be paused until the deadline, if it fails to pause an error is returned.
A common reason for failures is when a time is supplied that is in the past from the perspective of the server
func (*Consumer) PauseUntil ¶ added in v0.2.0
func (*Consumer) PendingAcknowledgement ¶ added in v0.0.20
PendingAcknowledgement reports the number of messages sent but not yet acknowledged
func (*Consumer) PendingMessages ¶ added in v0.0.20
PendingMessages is the number of unprocessed messages for this consumer
func (*Consumer) PriorityGroups ¶ added in v0.2.0
func (*Consumer) PriorityPolicy ¶ added in v0.2.0
func (c *Consumer) PriorityPolicy() api.PriorityPolicy
func (*Consumer) RedeliveryCount ¶
RedeliveryCount reports the number of redelivers that were done
func (*Consumer) ReplayPolicy ¶
func (c *Consumer) ReplayPolicy() api.ReplayPolicy
func (*Consumer) SampleFrequency ¶
func (*Consumer) StartSequence ¶
func (*Consumer) State ¶
func (c *Consumer) State() (api.ConsumerInfo, error)
State loads a snapshot of consumer state including delivery counts, retries and more
func (*Consumer) StreamName ¶
func (*Consumer) Unpin ¶ added in v0.2.0
Unpin requests that the server unpins the current client from a grouped consumer
func (*Consumer) UpdateConfiguration ¶ added in v0.0.27
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
UpdateConfiguration updates the consumer configuration At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (*Consumer) WaitingClientPulls ¶ added in v0.0.20
WaitingClientPulls is the number of clients that have outstanding pull requests against this consumer
type ConsumerOption ¶
type ConsumerOption func(o *api.ConsumerConfig) error
ConsumerOption configures consumers
func AckWait ¶
func AckWait(t time.Duration) ConsumerOption
AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted
func AcknowledgeAll ¶
func AcknowledgeAll() ConsumerOption
AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages
func AcknowledgeExplicit ¶
func AcknowledgeExplicit() ConsumerOption
AcknowledgeExplicit requires that every message received be acknowledged
func AcknowledgeNone ¶
func AcknowledgeNone() ConsumerOption
AcknowledgeNone disables message acknowledgement
func BackoffIntervals ¶ added in v0.0.29
func BackoffIntervals(i ...time.Duration) ConsumerOption
BackoffIntervals sets a series of intervals by which retries will be attempted for this consumr
func BackoffPolicy ¶ added in v0.0.29
func BackoffPolicy(policy []time.Duration) ConsumerOption
BackoffPolicy sets a consumer policy
func ConsumerDescription ¶ added in v0.0.26
func ConsumerDescription(d string) ConsumerOption
ConsumerDescription is a textual description of this consumer to provide additional context
func ConsumerMetadata ¶ added in v0.1.0
func ConsumerMetadata(meta map[string]string) ConsumerOption
func ConsumerName ¶ added in v0.1.0
func ConsumerName(s string) ConsumerOption
ConsumerName sets a name for the consumer, when creating a durable consumer use DurableName, using ConsumerName allows for creating named ephemeral consumers, else a random name will be generated
func ConsumerOverrideMemoryStorage ¶ added in v0.0.33
func ConsumerOverrideMemoryStorage() ConsumerOption
func ConsumerOverrideReplicas ¶ added in v0.0.33
func ConsumerOverrideReplicas(r int) ConsumerOption
ConsumerOverrideReplicas override the replica count inherited from the Stream with this value
func DeliverAllAvailable ¶
func DeliverAllAvailable() ConsumerOption
DeliverAllAvailable delivers messages starting with the first available in the stream
func DeliverBodies ¶ added in v0.1.2
func DeliverBodies() ConsumerOption
DeliverBodies configures the consumer to deliver the headers and the bodies for each message. To only deliver headers only use DeliverHeadersOnly.
func DeliverGroup ¶ added in v0.0.26
func DeliverGroup(g string) ConsumerOption
DeliverGroup when set will only deliver messages to subscriptions matching that group
func DeliverHeadersOnly ¶ added in v0.0.27
func DeliverHeadersOnly() ConsumerOption
DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies. To deliver also the bodies use DeliverBodies.
func DeliverLastPerSubject ¶ added in v0.0.26
func DeliverLastPerSubject() ConsumerOption
DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer
func DeliverySubject ¶
func DeliverySubject(s string) ConsumerOption
DeliverySubject is the subject where a Push consumer will deliver its messages
func DurableName ¶
func DurableName(s string) ConsumerOption
DurableName is the name given to the consumer, when not set an ephemeral consumer is created
func FilterStreamBySubject ¶
func FilterStreamBySubject(s ...string) ConsumerOption
FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject
func IdleHeartbeat ¶ added in v0.0.21
func IdleHeartbeat(hb time.Duration) ConsumerOption
IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive
func InactiveThreshold ¶ added in v0.0.29
func InactiveThreshold(t time.Duration) ConsumerOption
InactiveThreshold is the idle time an ephemeral consumer allows before it is removed
func LinearBackoffPolicy ¶ added in v0.0.29
LinearBackoffPolicy creates a backoff policy with linearly increasing steps between min and max
func MaxAckPending ¶ added in v0.0.20
func MaxAckPending(pending uint) ConsumerOption
MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended
func MaxDeliveryAttempts ¶
func MaxDeliveryAttempts(n int) ConsumerOption
MaxDeliveryAttempts is the number of times a message will be attempted to be delivered
func MaxRequestBatch ¶ added in v0.0.29
func MaxRequestBatch(max uint) ConsumerOption
MaxRequestBatch is the largest batch that can be specified when doing pulls against the consumer
func MaxRequestExpires ¶ added in v0.0.29
func MaxRequestExpires(max time.Duration) ConsumerOption
MaxRequestExpires is the longest pull request expire the server will allow
func MaxRequestMaxBytes ¶ added in v0.0.33
func MaxRequestMaxBytes(max int) ConsumerOption
MaxRequestMaxBytes sets the limit of max bytes a consumer my request
func MaxWaiting ¶ added in v0.0.24
func MaxWaiting(pulls uint) ConsumerOption
MaxWaiting is the number of outstanding pulls that are allowed on any one consumer. Pulls made that exceeds this limit are discarded.
func OverflowPriorityGroups ¶ added in v0.2.0
func OverflowPriorityGroups(groups ...string) ConsumerOption
OverflowPriorityGroups sets the consumer to support overflow pull requests
func PauseUntil ¶ added in v0.2.0
func PauseUntil(deadline time.Time) ConsumerOption
func PinnedClientPriorityGroups ¶ added in v0.2.0
func PinnedClientPriorityGroups(ttl time.Duration, groups ...string) ConsumerOption
PinnedClientPriorityGroups sets the consumer to be a pinned client priority consumer with a certain list of groups. When groups is empty the 'none' policy is set
func PrioritizedPriorityGroups ¶ added in v0.3.0
func PrioritizedPriorityGroups(groups ...string) ConsumerOption
PrioritizedPriorityGroups sets the consumer to be a prioritized priority consumer with a certain list of groups. When groups is empty the 'none' policy is set
func PushFlowControl ¶ added in v0.0.21
func PushFlowControl() ConsumerOption
PushFlowControl enables flow control for push based consumers
func RateLimitBitsPerSecond ¶ added in v0.0.18
func RateLimitBitsPerSecond(bps uint64) ConsumerOption
RateLimitBitsPerSecond limits message delivery to a rate in bits per second
func ReplayAsReceived ¶
func ReplayAsReceived() ConsumerOption
ReplayAsReceived delivers messages at the rate they were received at
func ReplayInstantly ¶
func ReplayInstantly() ConsumerOption
ReplayInstantly delivers messages to the consumer as fast as possible
func SamplePercent ¶
func SamplePercent(i int) ConsumerOption
SamplePercent configures sampling of a subset of messages expressed as a percentage
func StartAtSequence ¶
func StartAtSequence(s uint64) ConsumerOption
StartAtSequence starts consuming messages at a specific sequence in the stream
func StartAtTime ¶
func StartAtTime(t time.Time) ConsumerOption
StartAtTime starts consuming messages at a specific point in time in the stream
func StartAtTimeDelta ¶
func StartAtTimeDelta(d time.Duration) ConsumerOption
StartAtTimeDelta starts delivering messages at a past point in time
func StartWithLastReceived ¶
func StartWithLastReceived() ConsumerOption
StartWithLastReceived starts delivery at the last messages received in the stream
func StartWithNextReceived ¶
func StartWithNextReceived() ConsumerOption
StartWithNextReceived starts delivery at the next messages received in the stream
type ConsumerQueryOpt ¶ added in v0.2.0
type ConsumerQueryOpt func(query *consumerQuery) error
func ConsumerQueryApiLevelMin ¶ added in v0.2.0
func ConsumerQueryApiLevelMin(level int) ConsumerQueryOpt
ConsumerQueryApiLevelMin limits results to assets requiring API Level above or equal to level
func ConsumerQueryExpression ¶ added in v0.2.0
func ConsumerQueryExpression(e string) ConsumerQueryOpt
ConsumerQueryExpression filters the consumers using the expr expression language Using this option with a binary built with the `noexprlang` build tag will always return ErrNoExprLangBuild.
func ConsumerQueryInvert ¶ added in v0.2.0
func ConsumerQueryInvert() ConsumerQueryOpt
ConsumerQueryInvert inverts the logic of filters, older than becomes newer than and so forth
func ConsumerQueryIsBound ¶ added in v0.2.0
func ConsumerQueryIsBound() ConsumerQueryOpt
ConsumerQueryIsBound finds push consumers that are bound or pull consumers with waiting pulls
func ConsumerQueryIsPinned ¶ added in v0.2.0
func ConsumerQueryIsPinned() ConsumerQueryOpt
ConsumerQueryIsPinned finds consumers with pinned clients on all their groups
func ConsumerQueryIsPull ¶ added in v0.2.0
func ConsumerQueryIsPull() ConsumerQueryOpt
ConsumerQueryIsPull finds only Pull consumers
func ConsumerQueryIsPush ¶ added in v0.2.0
func ConsumerQueryIsPush() ConsumerQueryOpt
ConsumerQueryIsPush finds only Push consumers
func ConsumerQueryLeaderServer ¶ added in v0.2.0
func ConsumerQueryLeaderServer(server string) ConsumerQueryOpt
ConsumerQueryLeaderServer finds clustered consumers where a certain node is the leader
func ConsumerQueryOlderThan ¶ added in v0.2.0
func ConsumerQueryOlderThan(age time.Duration) ConsumerQueryOpt
ConsumerQueryOlderThan finds consumers older than age
func ConsumerQueryReplicas ¶ added in v0.2.0
func ConsumerQueryReplicas(r uint) ConsumerQueryOpt
ConsumerQueryReplicas finds streams with a certain number of replicas or less
func ConsumerQueryWithDeliverySince ¶ added in v0.2.0
func ConsumerQueryWithDeliverySince(age time.Duration) ConsumerQueryOpt
ConsumerQueryWithDeliverySince finds only consumers that has had deliveries since ts
func ConsumerQueryWithFewerAckPending ¶ added in v0.2.0
func ConsumerQueryWithFewerAckPending(pending int) ConsumerQueryOpt
ConsumerQueryWithFewerAckPending finds consumers with fewer pending messages
func ConsumerQueryWithFewerPending ¶ added in v0.2.0
func ConsumerQueryWithFewerPending(pending uint64) ConsumerQueryOpt
ConsumerQueryWithFewerPending finds consumers with fewer unprocessed messages
func ConsumerQueryWithFewerWaiting ¶ added in v0.2.0
func ConsumerQueryWithFewerWaiting(waiting int) ConsumerQueryOpt
ConsumerQueryWithFewerWaiting finds consumers with fewer waiting pulls
type Manager ¶ added in v0.0.19
func (*Manager) ConsumerNames ¶ added in v0.0.19
ConsumerNames is a sorted list of all known consumers within a stream
func (*Manager) Consumers ¶ added in v0.0.19
func (m *Manager) Consumers(stream string) (consumers []*Consumer, missing []string, offline map[string]string, err error)
Consumers is a sorted list of all known Consumers within a Stream and a list of any consumer names that were known but no details were found
func (*Manager) DeleteConsumer ¶ added in v0.0.34
DeleteConsumer removes a consumer without all the drama of loading it etc
func (*Manager) DeleteStream ¶ added in v0.0.34
DeleteStream removes a stream without all the drama of loading it etc
func (*Manager) DeleteStreamMessage ¶ added in v0.0.25
DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (*Manager) EachStream ¶ added in v0.0.19
func (m *Manager) EachStream(filter *StreamNamesFilter, cb func(*Stream)) (missing []string, offline map[string]string, err error)
EachStream iterates over all known Streams, Streams that the state is not known off are in the missing list and offline streams with reasons are in offline
func (*Manager) IsJetStreamEnabled ¶ added in v0.0.19
IsJetStreamEnabled determines if JetStream is enabled for the current account
func (*Manager) IsKnownConsumer ¶ added in v0.0.19
IsKnownConsumer determines if a Consumer is known for a specific Stream
func (*Manager) IsKnownStream ¶ added in v0.0.19
IsKnownStream determines if a Stream is known
func (*Manager) IsPedantic ¶ added in v0.2.0
IsPedantic checks if the manager is in pedantic mode
func (*Manager) IsStreamMaxBytesRequired ¶ added in v0.0.31
IsStreamMaxBytesRequired determines if the JetStream account requires streams to set a byte limit
func (*Manager) JetStreamAccountInfo ¶ added in v0.0.19
func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
JetStreamAccountInfo retrieves information about the current account limits and more
func (*Manager) LoadConsumer ¶ added in v0.0.19
LoadConsumer loads a consumer by name
func (*Manager) LoadFromStreamDetailBytes ¶ added in v0.2.0
func (m *Manager) LoadFromStreamDetailBytes(sd []byte) (stream *Stream, consumers []*Consumer, err error)
LoadFromStreamDetailBytes creates a stream info from the server StreamDetails in json format, the StreamDetails should be created with Config and Consumers options set
func (*Manager) LoadOrNewConsumer ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads a consumer by name if known else creates a new one with these properties
func (*Manager) LoadOrNewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template
func (*Manager) LoadOrNewStream ¶ added in v0.0.19
func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStream loads an existing stream or creates a new one matching opts
func (*Manager) LoadOrNewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStreamFromDefault loads an existing stream or creates a new one matching opts and template
func (*Manager) LoadStream ¶ added in v0.0.19
LoadStream loads a stream by name
func (*Manager) MetaApiLevel ¶ added in v0.2.0
MetaApiLevel determines the JetStream API level supported by the meta leader
func (*Manager) MetaLeaderStandDown ¶ added in v0.0.21
MetaLeaderStandDown requests the meta group leader to stand down, must be initiated by a system user
func (*Manager) MetaPeerRemove ¶ added in v0.0.21
MetaPeerRemove removes a peer from the JetStream meta cluster, evicting all streams, consumer etc. Use with extreme caution. If id is given it will be used by the server else name, it's generally best to remove by id
func (*Manager) MetaPurgeAccount ¶ added in v0.0.35
MetaPurgeAccount removes all data from an account, must be run in the system account
func (*Manager) NewConsumer ¶ added in v0.0.19
func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a consumer based on DefaultConsumer modified by opts
func (*Manager) NewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer based on a template config that gets modified by opts
func (*Manager) NewStream ¶ added in v0.0.19
func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
NewStream creates a new stream using DefaultStream as a starting template allowing adjustments to be made using options
func (*Manager) NewStreamConfiguration ¶ added in v0.0.19
func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func (*Manager) NewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
NewStreamFromDefault creates a new stream based on a supplied template and options
func (*Manager) NextMsg ¶ added in v0.0.19
NextMsg requests the next message from the server with the manager timeout
func (*Manager) NextMsgContext ¶ added in v0.0.19
func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
NextMsgContext requests the next message from the server. This request will wait for as long as the context is active. If repeated pulls will be made it's better to use NextMsgRequest()
func (*Manager) NextMsgRequest ¶ added in v0.0.20
func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages on a consumer, data or control flow messages will be sent to inbox
func (*Manager) NextSubject ¶ added in v0.0.21
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Manager) QueryStreams ¶ added in v0.0.29
func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error)
QueryStreams filters the streams found in JetStream using various filter options
func (*Manager) ReadLastMessageForSubject ¶ added in v0.0.25
func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Manager) RestoreSnapshotFromBuffer ¶ added in v0.2.0
func (m *Manager) RestoreSnapshotFromBuffer(ctx context.Context, stream string, dataReader, metadataReader io.ReadCloser, opts ...SnapshotOption) (*api.StreamState, error)
RestoreSnapshotFromBuffer restores a stream from a s2 compressed backup read from an io.Reader.
func (*Manager) RestoreSnapshotFromDirectory ¶ added in v0.0.21
func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
func (*Manager) StreamContainedSubjects ¶ added in v0.0.34
func (m *Manager) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)
StreamContainedSubjects queries the stream for the subjects it holds with optional filter
func (*Manager) StreamNames ¶ added in v0.0.19
func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
StreamNames is a sorted list of all known Streams filtered by filter
func (*Manager) Streams ¶ added in v0.0.19
func (m *Manager) Streams(filter *StreamNamesFilter) (streams []*Stream, missing []string, offline map[string]string, err error)
Streams is a sorted list of all known Streams and a list of any stream names that were known but no details were found, since 2.12 offline streams and reasons will be included also
type MsgInfo ¶
type MsgInfo struct {
// contains filtered or unexported fields
}
MsgInfo holds metadata about a message that was received from JetStream
func ParseJSMsgMetadata ¶
ParseJSMsgMetadata parse the reply subject metadata to determine message metadata
When given a message obtained using Direct Get APIs several fields will be filled in but consumer related ones will not as there is no consumer involved in that case
func ParseJSMsgMetadataDirect ¶ added in v0.2.0
ParseJSMsgMetadataDirect parses the DIRECT GET headers into a MsgInfo, in this case all consumer related properties will not be filled in as there is no consumer involved
func ParseJSMsgMetadataReply ¶ added in v0.0.20
ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message
func (*MsgInfo) ConsumerSequence ¶
ConsumerSequence is the sequence of this message in the consumer
func (*MsgInfo) Delivered ¶
Delivered is the number of times this message had delivery attempts including this one
func (*MsgInfo) Pending ¶ added in v0.0.20
Pending is the number of messages left to consume, -1 when the number is not reported
func (*MsgInfo) StreamSequence ¶
StreamSequence is the sequence of this message in the stream
type Option ¶ added in v0.0.19
type Option func(o *Manager)
Option is a option to configure the JetStream Manager
func WithAPIPrefix ¶ added in v0.0.21
WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES
func WithAPIValidation ¶
func WithAPIValidation(v api.StructValidator) Option
WithAPIValidation validates responses sent from the NATS server using a validator
func WithDomain ¶ added in v0.0.24
WithDomain sets a JetStream domain, incompatible with WithApiPrefix()
func WithEventPrefix ¶ added in v0.0.21
WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY
func WithPedanticRequests ¶ added in v0.2.0
func WithPedanticRequests() Option
WithPedanticRequests enables pedantic mode in certain API calls that would avoid the server changing user configurations during request handling
func WithTimeout ¶
WithTimeout sets a timeout for the requests
type PagerOption ¶ added in v0.0.19
type PagerOption func(p *StreamPager)
PagerOption configures the stream pager
func PagerFilterSubject ¶ added in v0.0.23
func PagerFilterSubject(s string) PagerOption
PagerFilterSubject sets a filter subject for the pager
func PagerSize ¶ added in v0.0.19
func PagerSize(sz int) PagerOption
PagerSize is the size of pages to walk
func PagerStartDelta ¶ added in v0.0.19
func PagerStartDelta(d time.Duration) PagerOption
PagerStartDelta sets a starting time delta for the pager
func PagerStartId ¶ added in v0.0.19
func PagerStartId(id int) PagerOption
PagerStartId sets a starting stream sequence for the pager
func PagerTimeout ¶ added in v0.0.19
func PagerTimeout(d time.Duration) PagerOption
PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached
type RestoreProgress ¶
type RestoreProgress interface {
// StartTime is when the process started
StartTime() time.Time
// EndTime is when the process ended - zero when not completed
EndTime() time.Time
// ChunkSize is the size of the data packets sent over NATS
ChunkSize() int
// ChunksSent is the number of chunks of size ChunkSize that was sent
ChunksSent() uint32
// ChunksToSend number of chunks of ChunkSize expected to be sent
ChunksToSend() int
// BytesSent is the number of bytes sent so far
BytesSent() uint64
// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
BytesPerSecond() uint64
}
type SnapshotOption ¶
type SnapshotOption func(o *snapshotOptions)
func RestoreConfiguration ¶ added in v0.0.22
func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
RestoreConfiguration overrides the configuration used to restore
func RestoreNotify ¶
func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
RestoreNotify notifies cb about progress of the restore operation
func SnapshotChunkSize ¶
func SnapshotChunkSize(sz int) SnapshotOption
SnapshotChunkSize sets the size of messages holding data the server will send, good values are 64KB and 128KB
func SnapshotConsumers ¶
func SnapshotConsumers() SnapshotOption
SnapshotConsumers includes consumer configuration and state in backups
func SnapshotDebug ¶
func SnapshotDebug() SnapshotOption
SnapshotDebug enables logging using the standard go logging library
func SnapshotHealthCheck ¶
func SnapshotHealthCheck() SnapshotOption
SnapshotHealthCheck performs a health check prior to starting the snapshot
func SnapshotNotify ¶
func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
SnapshotNotify notifies cb about progress of the snapshot operation
func WithProgress ¶ added in v0.2.0
func WithProgress(sz int) SnapshotOption
WithProgress enables progress tracking
type SnapshotProgress ¶
type SnapshotProgress interface {
// StartTime is when the process started
StartTime() time.Time
// EndTime is when the process ended - zero when not completed
EndTime() time.Time
// ChunkSize is the size of the data packets sent over NATS
ChunkSize() int
// ChunksReceived is how many chunks of ChunkSize were received
ChunksReceived() uint32
// BytesExpected is how many Bytes we should be receiving
BytesExpected() uint64
// BytesReceived is how many Bytes have been received
BytesReceived() uint64
// UncompressedBytesReceived is the number of bytes received uncompressed
UncompressedBytesReceived() uint64
// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
BytesPerSecond() uint64
// HealthCheck indicates if health checking was requested
HealthCheck() bool
// Finished will be true after all data have been written
Finished() bool
}
type Stream ¶
Stream represents a JetStream Stream
func (*Stream) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) AllowMsgTTL ¶ added in v0.2.0
func (*Stream) AtomicBatchPublishAllowed ¶ added in v0.2.4
func (*Stream) ClusterInfo ¶ added in v0.2.0
func (s *Stream) ClusterInfo() (api.ClusterInfo, error)
func (*Stream) Compression ¶ added in v0.1.0
func (s *Stream) Compression() api.Compression
func (*Stream) Configuration ¶
func (s *Stream) Configuration() api.StreamConfig
func (*Stream) ConsumerLimits ¶ added in v0.1.1
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits
func (*Stream) ConsumerNames ¶
ConsumerNames is a list of all known consumers for this Stream
func (*Stream) ContainedSubjects ¶ added in v0.0.34
ContainedSubjects queries the stream for the subjects it holds with optional filter
func (*Stream) CounterAllowed ¶ added in v0.2.4
func (*Stream) DeleteAllowed ¶ added in v0.0.34
func (*Stream) DeleteMessage
deprecated
func (*Stream) DeleteMessageRequest ¶ added in v0.2.0
func (s *Stream) DeleteMessageRequest(req api.JSApiMsgDeleteRequest) (err error)
DeleteMessageRequest deletes a specific message from the Stream with a full request
func (*Stream) Description ¶ added in v0.0.26
func (*Stream) DetectGaps ¶ added in v0.1.0
func (s *Stream) DetectGaps(ctx context.Context, progress func(seq uint64, pending uint64), gap func(first uint64, last uint64)) error
DetectGaps detects interior deletes in a stream, reports progress through the stream and each found gap.
It uses the extended stream info to get the sequences and use that to detect gaps. The Deleted information in StreamInfo is capped at some amount so if it determines there are more messages that are deleted in the stream it will then make a consumer and walk the remainder of the stream to detect gaps the hard way
func (*Stream) DirectAllowed ¶ added in v0.0.34
func (*Stream) DirectGet ¶ added in v0.2.0
func (s *Stream) DirectGet(ctx context.Context, req api.JSApiMsgGetRequest, handler func(msg *nats.Msg)) (numPending uint64, lastSeq uint64, upToSeq uint64, err error)
DirectGet performs a direct get against the stream, supports Batch and Multi Subject behaviors
func (*Stream) DirectSubject ¶ added in v0.2.0
DirectSubject is the subject to perform direct gets against
func (*Stream) DiscardNewPerSubject ¶ added in v0.0.35
func (*Stream) DiscardPolicy ¶ added in v0.0.35
func (s *Stream) DiscardPolicy() api.DiscardPolicy
func (*Stream) DuplicateWindow ¶ added in v0.0.18
func (*Stream) EachConsumer ¶
func (s *Stream) EachConsumer(cb func(consumer *Consumer)) (missing []string, offline map[string]string, err error)
EachConsumer calls cb with each known consumer for this stream, error on any error to load consumers
func (*Stream) FastDeleteMessage
deprecated
added in
v0.0.25
func (*Stream) FirstSequence ¶ added in v0.1.0
func (*Stream) Information ¶
func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error)
Information loads the current stream information
func (*Stream) IsCompressed ¶ added in v0.1.0
IsCompressed determines if a stream is compressed
func (*Stream) IsInternal ¶ added in v0.0.27
IsInternal indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func (*Stream) IsKVBucket ¶ added in v0.0.27
IsKVBucket determines if a stream is a KV bucket
func (*Stream) IsMQTTState ¶ added in v0.0.27
IsMQTTState determines if a stream holds internal MQTT state
func (*Stream) IsMirror ¶ added in v0.0.21
IsMirror determines if this stream is a mirror of another
func (*Stream) IsObjectBucket ¶ added in v0.0.27
IsObjectBucket determines if a stream is a Object bucket
func (*Stream) IsRepublishing ¶ added in v0.0.33
func (*Stream) IsSourced ¶ added in v0.0.21
IsSourced determines if this stream is sourcing data from another stream. Other streams could be synced to this stream and it would not be reported by this property
func (*Stream) IsTemplateManaged ¶
IsTemplateManaged determines if this stream is managed by a template
func (*Stream) LatestInformation ¶
func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
LatestInformation returns the most recently fetched stream information
func (*Stream) LatestState ¶
func (s *Stream) LatestState() (state api.StreamState, err error)
LatestState returns the most recently fetched stream state
func (*Stream) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election, the election of the next leader can be influenced by placement
func (*Stream) LoadConsumer ¶
LoadConsumer loads a named consumer related to this Stream
func (*Stream) LoadOrNewConsumer ¶
func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads or creates a consumer based on these options
func (*Stream) LoadOrNewConsumerFromDefault ¶
func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads or creates a consumer based on these options that adjust supplied template
func (*Stream) MaxConsumers ¶
func (*Stream) MaxMsgSize ¶
func (*Stream) MaxMsgsPerSubject ¶ added in v0.0.24
func (*Stream) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) Mirror ¶ added in v0.0.21
func (s *Stream) Mirror() *api.StreamSource
func (*Stream) MirrorDirectAllowed ¶ added in v0.0.34
func (*Stream) NewConsumer ¶
func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a new consumer in this Stream based on DefaultConsumer
func (*Stream) NewConsumerFromDefault ¶
func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer in this Stream based on a supplied template config
func (*Stream) PageContents ¶ added in v0.0.19
func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
PageContents creates a StreamPager used to traverse the contents of the stream, Close() should be called to dispose of the background consumer and resources
func (*Stream) PersistenceMode ¶ added in v0.3.0
func (s *Stream) PersistenceMode() api.PersistModeType
func (*Stream) Purge ¶
func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
Purge deletes messages from the Stream, an optional JSApiStreamPurgeRequest can be supplied to limit the purge to a subset of messages
func (*Stream) PurgeAllowed ¶ added in v0.0.27
func (*Stream) QueryConsumers ¶ added in v0.2.0
func (s *Stream) QueryConsumers(opts ...ConsumerQueryOpt) ([]*Consumer, error)
QueryConsumers filters the streams found in JetStream using various filter options
func (*Stream) ReadLastMessageForSubject ¶ added in v0.0.25
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Stream) ReadMessage ¶
ReadMessage loads a message from the stream by its sequence number
func (*Stream) RemoveRAFTPeer ¶ added in v0.0.21
RemoveRAFTPeer removes a peer from the group indicating it will not return
func (*Stream) Retention ¶
func (s *Stream) Retention() api.RetentionPolicy
func (*Stream) RollupAllowed ¶ added in v0.0.27
func (*Stream) SchedulesAllowed ¶ added in v0.3.0
func (*Stream) Seal ¶ added in v0.0.27
Seal updates a stream so that messages can not be added or removed using the API and limits will not be processed - messages will never age out. A sealed stream can not be unsealed.
func (*Stream) SnapshotToBuffer ¶ added in v0.2.0
func (s *Stream) SnapshotToBuffer(ctx context.Context, dataBuffer, metadataBuffer io.WriteCloser, opts ...SnapshotOption) (SnapshotProgress, error)
SnapshotToBuffer creates a compressed s2 backup and writes to an io.Writer
func (*Stream) SnapshotToDirectory ¶ added in v0.0.21
func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
SnapshotToDirectory creates a backup into s2 compressed tar file
func (*Stream) Sources ¶ added in v0.0.21
func (s *Stream) Sources() []*api.StreamSource
func (*Stream) State ¶
func (s *Stream) State(req ...api.JSApiStreamInfoRequest) (stats api.StreamState, err error)
State retrieves the Stream State
func (*Stream) Storage ¶
func (s *Stream) Storage() api.StorageType
func (*Stream) SubjectDeleteMarkerTTL ¶ added in v0.2.0
func (*Stream) UpdateConfiguration ¶
func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update
type StreamNamesFilter ¶ added in v0.0.20
type StreamNamesFilter struct {
// Subject filter the names to those consuming messages matching this subject or wildcard
Subject string `json:"subject,omitempty"`
}
StreamNamesFilter limits the names being returned by the names API
type StreamOption ¶
type StreamOption func(o *api.StreamConfig) error
StreamOption configures a stream
func AllowAtomicBatchPublish ¶ added in v0.2.4
func AllowAtomicBatchPublish() StreamOption
func AllowCounter ¶ added in v0.2.4
func AllowCounter() StreamOption
func AllowDirect ¶ added in v0.0.34
func AllowDirect() StreamOption
func AllowMsgTTL ¶ added in v0.2.0
func AllowMsgTTL() StreamOption
func AllowRollup ¶ added in v0.0.27
func AllowRollup() StreamOption
func AllowSchedules ¶ added in v0.3.0
func AllowSchedules() StreamOption
func AppendSource ¶ added in v0.0.21
func AppendSource(source *api.StreamSource) StreamOption
func AsyncPersistence ¶ added in v0.3.0
func AsyncPersistence() StreamOption
func Compression ¶ added in v0.1.0
func Compression(alg api.Compression) StreamOption
func ConsumerLimits ¶ added in v0.1.1
func ConsumerLimits(limits api.StreamConsumerLimits) StreamOption
func DenyDelete ¶ added in v0.0.27
func DenyDelete() StreamOption
func DenyPurge ¶ added in v0.0.27
func DenyPurge() StreamOption
func DiscardNew ¶
func DiscardNew() StreamOption
func DiscardNewPerSubject ¶ added in v0.0.35
func DiscardNewPerSubject() StreamOption
func DiscardOld ¶
func DiscardOld() StreamOption
func DuplicateWindow ¶ added in v0.0.18
func DuplicateWindow(d time.Duration) StreamOption
func FileStorage ¶
func FileStorage() StreamOption
func FirstSequence ¶ added in v0.1.0
func FirstSequence(seq uint64) StreamOption
func InterestRetention ¶
func InterestRetention() StreamOption
func LimitsRetention ¶
func LimitsRetention() StreamOption
func MaxAge ¶
func MaxAge(m time.Duration) StreamOption
func MaxBytes ¶
func MaxBytes(m int64) StreamOption
func MaxConsumers ¶
func MaxConsumers(m int) StreamOption
func MaxMessageSize ¶
func MaxMessageSize(m int32) StreamOption
func MaxMessages ¶
func MaxMessages(m int64) StreamOption
func MaxMessagesPerSubject ¶ added in v0.0.24
func MaxMessagesPerSubject(m int64) StreamOption
func MemoryStorage ¶
func MemoryStorage() StreamOption
func Mirror ¶ added in v0.0.21
func Mirror(stream *api.StreamSource) StreamOption
func MirrorDirect ¶ added in v0.0.34
func MirrorDirect() StreamOption
func NoAck ¶
func NoAck() StreamOption
func NoAllowAtomicBatchPublish ¶ added in v0.2.4
func NoAllowAtomicBatchPublish() StreamOption
func NoAllowCounter ¶ added in v0.2.4
func NoAllowCounter() StreamOption
func NoAllowDirect ¶ added in v0.0.34
func NoAllowDirect() StreamOption
func NoMirrorDirect ¶ added in v0.0.34
func NoMirrorDirect() StreamOption
func PlacementCluster ¶ added in v0.0.21
func PlacementCluster(cluster string) StreamOption
func PlacementPreferredLeader ¶ added in v0.2.0
func PlacementPreferredLeader(leader string) StreamOption
func PlacementTags ¶ added in v0.0.21
func PlacementTags(tags ...string) StreamOption
func Replicas ¶
func Replicas(r int) StreamOption
func Republish ¶ added in v0.0.33
func Republish(m *api.RePublish) StreamOption
func Sources ¶ added in v0.0.21
func Sources(streams ...*api.StreamSource) StreamOption
func StreamDescription ¶ added in v0.0.26
func StreamDescription(d string) StreamOption
StreamDescription is a textual description of this stream to provide additional context
func StreamMetadata ¶ added in v0.1.0
func StreamMetadata(meta map[string]string) StreamOption
func SubjectDeleteMarkerTTL ¶ added in v0.2.0
func SubjectDeleteMarkerTTL(d time.Duration) StreamOption
func SubjectTransform ¶ added in v0.2.0
func SubjectTransform(subjectTransform *api.SubjectTransformConfig) StreamOption
func Subjects ¶
func Subjects(s ...string) StreamOption
func WorkQueueRetention ¶
func WorkQueueRetention() StreamOption
type StreamPager ¶ added in v0.0.19
type StreamPager struct {
// contains filtered or unexported fields
}
func (*StreamPager) Close ¶ added in v0.0.19
func (p *StreamPager) Close() error
Close dispose of the resources used by the pager and should be called when done
func (*StreamPager) NextMsg ¶ added in v0.0.19
NextMsg retrieves the next message from the pager interrupted by ctx.
last indicates if the message is the last in the current page, the next call to NextMsg will first request the next page, if the client is prompting users to continue to the next page it should be done when last is true
When the end of the stream is reached err will be non nil and last will be true otherwise err being non nil while last is false indicate a failed state. End is indicated by no new messages arriving after ctx timeout or the time set using PagerTimes() is reached
type StreamQueryOpt ¶ added in v0.0.29
type StreamQueryOpt func(query *streamQuery) error
func StreamQueryApiLevelMin ¶ added in v0.2.0
func StreamQueryApiLevelMin(level int) StreamQueryOpt
StreamQueryApiLevelMin limits results to assets requiring API Level above or equal to level
func StreamQueryClusterName ¶ added in v0.0.29
func StreamQueryClusterName(c string) StreamQueryOpt
StreamQueryClusterName limits results to servers within a cluster matched by a regular expression
func StreamQueryExpression ¶ added in v0.1.0
func StreamQueryExpression(e string) StreamQueryOpt
StreamQueryExpression filters the stream using the expr expression language Using this option with a binary built with the `noexprlang` build tag will always return ErrNoExprLangBuild.
func StreamQueryFewerConsumersThan ¶ added in v0.0.29
func StreamQueryFewerConsumersThan(c uint) StreamQueryOpt
StreamQueryFewerConsumersThan limits results to streams with fewer than or equal consumers than c
func StreamQueryIdleLongerThan ¶ added in v0.0.29
func StreamQueryIdleLongerThan(p time.Duration) StreamQueryOpt
StreamQueryIdleLongerThan limits results to streams that has not received messages for a period longer than p
func StreamQueryInvert ¶ added in v0.0.29
func StreamQueryInvert() StreamQueryOpt
StreamQueryInvert inverts the logic of filters, older than becomes newer than and so forth
func StreamQueryIsMirror ¶ added in v0.1.0
func StreamQueryIsMirror() StreamQueryOpt
func StreamQueryIsSourced ¶ added in v0.1.0
func StreamQueryIsSourced() StreamQueryOpt
func StreamQueryLeaderServer ¶ added in v0.2.0
func StreamQueryLeaderServer(server string) StreamQueryOpt
StreamQueryLeaderServer finds clustered streams where a certain node is the leader
func StreamQueryOlderThan ¶ added in v0.0.29
func StreamQueryOlderThan(p time.Duration) StreamQueryOpt
StreamQueryOlderThan limits the results to streams older than p
func StreamQueryReplicas ¶ added in v0.1.0
func StreamQueryReplicas(r uint) StreamQueryOpt
StreamQueryReplicas finds streams with a certain number of replicas or less
func StreamQueryServerName ¶ added in v0.0.29
func StreamQueryServerName(s string) StreamQueryOpt
StreamQueryServerName limits results to servers matching a regular expression
func StreamQuerySubjectWildcard ¶ added in v0.0.33
func StreamQuerySubjectWildcard(s string) StreamQueryOpt
StreamQuerySubjectWildcard limits results to streams with subject interest matching standard a nats wildcard
func StreamQueryWithoutMessages ¶ added in v0.0.29
func StreamQueryWithoutMessages() StreamQueryOpt
StreamQueryWithoutMessages limits results to streams with no messages
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
|
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context. |