Documentation
¶
Index ¶
- func ContextWithBus(ctx context.Context, bus *Bus) context.Context
- func FilterFmt(kind string, labels ...Label) string
- type Bus
- func (b *Bus) DisableBufferPublication()
- func (b *Bus) EnableBufferPublication(capacity uint32)
- func (b *Bus) Name() string
- func (b *Bus) Pub(v Messager, labels ...Label)
- func (b *Bus) SetDefaultSubscriptionQueueSize(i uint64)
- func (b *Bus) SetDrainChanDuration(duration time.Duration)
- func (b *Bus) SetPanicOnFullQueue(graceTime time.Duration)
- func (b *Bus) Start(ctx context.Context)
- func (b *Bus) Stop()
- func (b *Bus) Sub(name string, options ...interface{}) *Subscription
- type ErrSubscriptionIDNotFound
- type Label
- type Labels
- type Messager
- type Msg
- type Publisher
- type QueueSizer
- type Subscriber
- type Subscription
- type SubscriptionError
- type SubscriptionQueueThreshold
- type Timeout
- type Timeouter
- type WithQueueSize
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextWithBus ¶
ContextWithBus stores the bus in the context and returns the new context.
Types ¶
type Bus ¶
func BusFromContext ¶
func PubFromContext ¶
func (*Bus) DisableBufferPublication ¶
func (b *Bus) DisableBufferPublication()
DisableBufferPublication disable the publication buffering. It dequeues the publication buffer channel for retransmission. publication buffer channel is then closed and the new publications are immediately delivered pubsub default behavior is unbuffered.
func (*Bus) EnableBufferPublication ¶
EnableBufferPublication enable the publication buffering. The future publication commands are push to a fresh bufferedPublicationEnabled channel of cmdPub with cap capacity, instead of being delivered immediately.
pubsub default behavior is unbuffered.
func (*Bus) Pub ¶
Pub posts a new Publication on the bus. The labels are added to existing v labels, so a subscriber can retrieve message publication labels from the received message.
func (*Bus) SetDefaultSubscriptionQueueSize ¶
SetDefaultSubscriptionQueueSize overrides the default queue size of subscribers for not yet started bus.
It panics if called on started bus.
func (*Bus) SetDrainChanDuration ¶
SetDrainChanDuration overrides defaultDrainChanDuration for not yet started bus.
It panics if called on started bus.
func (*Bus) SetPanicOnFullQueue ¶
SetPanicOnFullQueue enable panic after grace time on subscriptions with no timeout has reached subscription maximum queue size. Zero graceTime disable panic on full queue feature.
It panics if called on started bus.
func (*Bus) Sub ¶
func (b *Bus) Sub(name string, options ...interface{}) *Subscription
Sub function requires a new Subscription "name" on the bus.
The not empty string <name> parameter is used to compute the subscription family (the fist field of name), example: with name "daemon.imon foo@node1", family is "daemon.imon". Function will panic if name is empty.
Used options: Timeouter, QueueSizer
when Timeouter, it sets the subscriber timeout to pull each message, subscriber with exceeded timeout notification are automatically dropped, and SubscriptionError message is sent on bus. defaults is no timeout
when QueueSizer, it sets the subscriber queue size. default value is bus dependent (see SetDefaultSubscriptionQueueSize)
type ErrSubscriptionIDNotFound ¶
type ErrSubscriptionIDNotFound struct {
// contains filtered or unexported fields
}
func (ErrSubscriptionIDNotFound) Error ¶
func (e ErrSubscriptionIDNotFound) Error() string
type Labels ¶
Labels allow message routing filtering based on key/value matching
func (Labels) Key ¶
Key returns labelMap key as a string with ordered label names, to ensure matching publication filter label combination
func (Labels) Keys ¶
Keys returns all the combination of the labels, including the empty label. keys are sorted first to avoid need of permutation. ex:
keys of l1=foo l2=foo l3=foo:
[
"",
"{l1=foo}",
"{l1=foo}{l2=foo}",
"{l1=foo}{l2=foo}{l3=foo}",
"{l1=foo}{l3=foo}",
"{l2=foo}",
"{l2=foo}{l3=foo}",
"{l3=foo}",
]
type Publisher ¶
Publisher is an interface for publishing messages with optional associated labels. Pub publishes a Messager with an optional set of Label parameters to the subscribers.
type QueueSizer ¶
type QueueSizer interface {
// contains filtered or unexported methods
}
type Subscriber ¶
type Subscriber interface {
Sub(string, ...interface{}) *Subscription
}
Subscriber defines the interface for subscribing to a topic with filters, returning a Subscription instance.
type Subscription ¶
type Subscription struct {
// C is the channel exposed to the subscriber for polling
C chan any
// contains filtered or unexported fields
}
func SubFromContext ¶
func SubFromContext(ctx context.Context, name string, options ...interface{}) *Subscription
func (*Subscription) AddFilter ¶
func (sub *Subscription) AddFilter(v any, labels ...Label)
func (*Subscription) DelFilter ¶
func (sub *Subscription) DelFilter(v any, labels ...Label)
func (*Subscription) Drain ¶
func (sub *Subscription) Drain()
Drain dequeues exposed channel.
Drain is automatically called during sub.Stop()
func (*Subscription) Start ¶
func (sub *Subscription) Start()
func (*Subscription) Stop ¶
func (sub *Subscription) Stop() error
Stop closes the subscription and deueues private and exposed subscription channels
func (*Subscription) String ¶
func (sub *Subscription) String() string
type SubscriptionError ¶
type SubscriptionError struct {
Msg
ID uuid.UUID `json:"id"`
Name string `json:"name"`
ErrS string `json:"error"`
}
SubscriptionError is an emitted publication made when a subscriber notification exceeds its timeout
func (SubscriptionError) Kind ¶
func (m SubscriptionError) Kind() string
type SubscriptionQueueThreshold ¶
type SubscriptionQueueThreshold struct {
Msg
ID uuid.UUID
Name string `json:"name"`
// Count is the current used slots in internal subscriber queue
Count uint64 `json:"count"`
// From is the previous high threshold value
From uint64 `json:"from"`
// To is the new high threshold value
To uint64 `json:"to"`
// Limit is the maximum queue size
Limit uint64 `json:"limit"`
}
SubscriptionQueueThreshold is an emitted publication made when a subscriber queue reach/leave its current high threshold value
func (SubscriptionQueueThreshold) Kind ¶
func (m SubscriptionQueueThreshold) Kind() string
type WithQueueSize ¶
type WithQueueSize uint64