Documentation
¶
Overview ¶
Package mq provides a unified abstraction for message queue components.
Key components:
- message.go: Defines message attributes (Topic, Payload, Extra, Tag, etc.) enabling different MQ drivers to implement specific features via composition.
- pubsub.go: Defines standard roles including Consumer, Producer, Observer, and Factory.
- resource.go: Provides universal resource management, allowing resources created by the Factory to be managed through a centralized ResourceManager.
Code generated by genx:code DO NOT EDIT.
Index ¶
- func CRC(k string) uint16
- func Carry[PM any, CM any](ps PubSub[PM, CM]) contextx.Carrier
- func Fnv(k string) uint16
- func With[PM any, CM any](ctx context.Context, ps PubSub[PM, CM]) context.Context
- type Acknowledger
- type AcknowledgerCanDiscard
- type AsyncPubCallback
- type CanAppendExtra
- type CanAppendTags
- type CanBeReleased
- type CanRefreshConsumedAt
- type CanRefreshPublishedAt
- type CanSetBacklog
- type CanSetConsumedAt
- type CanSetDelay
- type CanSetExpiredAt
- type CanSetOffset
- type CanSetOrderingKey
- type CanSetPartitionID
- type CanSetPartitionKey
- type CanSetPayload
- type CanSetProducer
- type CanSetPublishedAt
- type CanSetRetryCount
- type CanSetSequenceID
- type CanSetTopic
- type CanSetUnderlying
- type ConsumeHandleMode
- type Consumer
- type Error
- type Factory
- type HasBacklog
- type HasBrokerLatency
- type HasConsumedAt
- type HasDelay
- type HasExpiredAt
- type HasExtra
- type HasLatency
- type HasOffset
- type HasOrderingKey
- type HasPartitionID
- type HasPartitionKey
- type HasPayload
- type HasProducer
- type HasPublishedAt
- type HasRetryCount
- type HasSequenceID
- type HasTags
- type HasTopic
- type HasUnderlying
- type Hasher
- type Observer
- type Option
- type OptionApplier
- type OptionApplyFunc
- type Producer
- type PubSub
- type ReleaseOption
- type ReleaseOptionFunc
- type Resource
- type ResourceManager
- type SubCallback
- type SubHandler
- type Suite
- type Unsubscriber
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Acknowledger ¶ added in v0.3.0
Acknowledger provides message acknowledgment and negative acknowledgment.
Used in SubHandler or SubCallback to explicitly control delivery semantics: Ack indicates successful processing; Nack indicates failure (typically triggers retry or dead-letter).
type AcknowledgerCanDiscard ¶ added in v0.3.1
type AcknowledgerCanDiscard[M any] interface { Acknowledger[M] // Discard discards message. deliveries message to DLQ directly when biz assures // message M cannot be handled, such as message cannot be unmarshalled Discard(M) error }
type AsyncPubCallback ¶ added in v0.3.0
type CanAppendExtra ¶ added in v0.3.0
type CanAppendTags ¶ added in v0.3.0
type CanAppendTags interface {
AddTags(...string)
}
type CanBeReleased ¶ added in v0.3.0
type CanBeReleased interface {
// Release close and cleanup resource. unsub denotes if unsubscribes before close
Release(...ReleaseOptionFunc) error
}
type CanRefreshConsumedAt ¶ added in v0.3.0
type CanRefreshConsumedAt interface {
RefreshConsumedAt()
}
type CanRefreshPublishedAt ¶ added in v0.3.0
type CanRefreshPublishedAt interface {
RefreshPublishedAt()
}
type CanSetBacklog ¶ added in v0.3.0
type CanSetBacklog interface {
SetBacklog(int64)
}
type CanSetConsumedAt ¶ added in v0.3.0
type CanSetDelay ¶ added in v0.3.0
type CanSetExpiredAt ¶ added in v0.3.1
type CanSetOffset ¶ added in v0.3.0
type CanSetOffset interface {
SetOffset(int64)
}
type CanSetOrderingKey ¶ added in v0.3.0
type CanSetOrderingKey interface {
SetOrderingKey(string)
}
type CanSetPartitionID ¶ added in v0.3.0
type CanSetPartitionID interface {
SetPartitionID(uint64)
}
type CanSetPartitionKey ¶ added in v0.3.0
type CanSetPartitionKey interface {
SetPartitionKey(string)
}
type CanSetPayload ¶ added in v0.3.0
type CanSetPayload interface {
SetPayload([]byte)
}
type CanSetProducer ¶ added in v0.3.0
type CanSetProducer interface {
SetProducer(string)
}
type CanSetPublishedAt ¶ added in v0.3.0
type CanSetRetryCount ¶ added in v0.3.1
type CanSetRetryCount interface {
AddRetryCount()
SetRetryCount(uint32)
}
type CanSetSequenceID ¶ added in v0.3.0
type CanSetSequenceID interface {
SetSequenceID(int64)
}
type CanSetTopic ¶ added in v0.3.0
type CanSetTopic interface {
SetTopic(string)
}
type CanSetUnderlying ¶ added in v0.3.0
type CanSetUnderlying[T any] interface { SetUnderlying(T) }
type ConsumeHandleMode ¶ added in v0.3.0
type ConsumeHandleMode int
const ( // GlobalOrdered messages are processed strictly one by one in the order // they were received globally. GlobalOrdered ConsumeHandleMode = iota // PartitionOrdered messages with the same partition key are processed sequentially, // while messages with different keys are handled in parallel. PartitionOrdered // Concurrent messages are processed in parallel with no guarantee of ordering. // this mode offers the highest throughput. Concurrent )
type Consumer ¶ added in v0.3.0
type Consumer[M any] interface { Acknowledger[M] Observer[M] }
Consumer is the universal subscriber interface for message queues. the type parameter M denotes the consumed message type, typically implemented by different driver, eg: pulsar, rabbitmq, kafka etc.
type Error ¶ added in v0.3.0
type Error int8
Error presents error codes of message queue +genx:code
const ( ERROR_UNDEFINED Error = iota ERROR__CLI_CLOSED // client closed ERROR__CLI_INIT_ERROR // client init failed ERROR__SUB_CLOSED // subscriber closed ERROR__SUB_BOOTED // subscriber is already booted ERROR__SUB_HANDLER_PANICKED // subscriber handler panicked ERROR__SUB_PARSE_MESSAGE_ERROR // subscriber failed to parse message ERROR__SUB_UNSUBSCRIBED // subscriber unsubscribed ERROR__PUB_CLOSED // publisher closed ERROR__PUB_INVALID_MESSAGE // publisher got invalid message )
type HasBacklog ¶ added in v0.3.0
type HasBacklog interface {
Backlog() int64
}
HasBacklog presents backlog messages when consuming
type HasBrokerLatency ¶ added in v0.3.0
type HasConsumedAt ¶ added in v0.3.0
type HasExpiredAt ¶ added in v0.3.1
type HasExpiredAt interface {
ExpiredAt() int64
}
HasExpiredAt helps consumer to check if message is expired to decide if consumer need to handle and effectiveness
type HasLatency ¶ added in v0.3.0
type HasOrderingKey ¶ added in v0.3.0
type HasOrderingKey interface {
OrderingKey() string
}
HasOrderingKey is used during message consumption to identify the biz key that governs processing order
type HasPartitionID ¶ added in v0.3.0
type HasPartitionID interface {
PartitionID() int64
}
type HasPartitionKey ¶ added in v0.3.0
type HasPartitionKey interface {
// PartitionKey biz key used for partition hashing
PartitionKey() string
}
HasPartitionKey is used during message production to retrieve or specify the biz partition key for sharding.
type HasPayload ¶ added in v0.3.0
type HasPayload interface {
Payload() []byte
}
type HasProducer ¶ added in v0.3.0
type HasProducer interface {
ProducedBy() string
}
type HasPublishedAt ¶ added in v0.3.0
type HasRetryCount ¶ added in v0.3.0
type HasRetryCount interface {
RetryCount() uint32
}
type HasSequenceID ¶ added in v0.3.0
type HasSequenceID interface {
SequenceID() int64
}
type HasUnderlying ¶ added in v0.3.0
type HasUnderlying[T any] interface { Underlying() T }
HasUnderlying returns or retrieves the raw underlying message value. this is typically used to access driver-specific message type provided by the MQ implementation (e.g. pulsar.Message, kafka.Message, etc.).
type Observer ¶ added in v0.3.0
type Observer[M any] interface { // Run starts once consuming loop, processing each message with h. // it blocks until ctx canceled or consumer is resource closed and released. Run(ctx context.Context, h SubHandler[M]) error // Close closes the consumer and releases related resource Close() error }
Observer is a passive consumer interface with no side effects on MQ state.
type OptionApplier ¶
type OptionApplier interface {
Apply(Option)
}
type OptionApplyFunc ¶
type OptionApplyFunc func(Option)
func (OptionApplyFunc) Apply ¶
func (f OptionApplyFunc) Apply(opt Option)
type Producer ¶ added in v0.3.0
type Producer[M any] interface { // Topic returns the topic name bound to this producer. Topic() string // Publish publishes payload to the given topic. Publish(ctx context.Context, topic string, payload []byte) (M, error) // PublishWithKey publishes payload with partition key to the given topic, // for partition ordering or load balancing. PublishWithKey(ctx context.Context, topic string, key string, payload []byte) (M, error) // PublishMessage publishes message PublishMessage(context.Context, M) error // Close closes the producer and releases connections and related resources. Close() error }
Producer is the universal producer interface for message queues.
type PubSub ¶
type PubSub[PM any, CM any] interface { // NewProducer creates and returns a producer; topic and other options may be set via OptionApplier. NewProducer(context.Context, ...OptionApplier) (Producer[PM], error) // NewConsumer creates and returns a consumer; topic, subscription name, consume mode, etc. may be set via OptionApplier. NewConsumer(context.Context, ...OptionApplier) (Consumer[CM], error) // Close closes the pub/sub endpoint and releases underlying connection pools and resources. Close() error }
PubSub defines the universal MQ client interface using a factory pattern for Producer and Consumer. PM is the produced message type, CM is the consumed message type instances are created via NewProducer, NewConsumer and configured with OptionApplier
type ReleaseOption ¶ added in v0.3.0
type ReleaseOption struct {
Unsub bool
}
type ReleaseOptionFunc ¶ added in v0.3.0
type ReleaseOptionFunc func(*ReleaseOption)
func WithUnsubscribe ¶ added in v0.3.0
func WithUnsubscribe() ReleaseOptionFunc
type ResourceManager ¶ added in v0.3.0
type ResourceManager interface {
ConsumerCount() int
ProducerCount() int
ObserverCount() int
AddConsumer(Resource)
AddProducer(Resource)
AddObserver(Resource)
Unsubscribe(Resource) error
CloseConsumer(Resource) error
CloseProducer(Resource) error
CloseObserver(Resource) error
Close() error
}
ResourceManager maintains all Consumer, Producer and Observer created by PubSub
Example ¶
package main
import (
"container/list"
"fmt"
"github.com/xoctopus/confx/pkg/types/mq"
)
type MockResource struct {
name string
idx int
elem *list.Element
}
func (r *MockResource) Elem() *list.Element {
return r.elem
}
func (r *MockResource) SetElem(e *list.Element) {
r.elem = e
}
func (r *MockResource) Release(appliers ...mq.ReleaseOptionFunc) error {
opt := &mq.ReleaseOption{}
for _, applier := range appliers {
applier(opt)
}
if !opt.Unsub {
fmt.Printf("\tresource %s:%d released\n", r.name, r.idx)
} else {
fmt.Printf("\tresource %s:%d unsubscribed and released\n", r.name, r.idx)
}
return nil
}
func main() {
rm := mq.NewResourceManager()
defer func() {
fmt.Println("Close resource manager:")
_ = rm.Close()
fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
fmt.Printf("\tobservers: %d\n", rm.ObserverCount())
}()
fmt.Println("Empty resource:")
fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
fmt.Printf("\tobservers: %d\n", rm.ObserverCount())
rm.AddProducer(&MockResource{name: "producer", idx: 1})
p := &MockResource{name: "producer", idx: 2}
rm.AddProducer(p)
rm.AddConsumer(&MockResource{name: "consumer", idx: 1})
c1 := &MockResource{name: "consumer", idx: 2}
rm.AddConsumer(c1)
c2 := &MockResource{name: "consumer", idx: 3}
rm.AddConsumer(c2)
rm.AddObserver(&MockResource{name: "observer", idx: 1})
o := &MockResource{name: "observer", idx: 2}
rm.AddObserver(o)
fmt.Println("After add resources:")
fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
fmt.Printf("\tobservers: %d\n", rm.ObserverCount())
fmt.Println("Close or Unsubscribe resources:")
_ = rm.CloseProducer(p)
_ = rm.CloseConsumer(c1)
_ = rm.Unsubscribe(c2)
_ = rm.CloseObserver(o)
fmt.Println("After releases:")
fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
fmt.Printf("\tobservers: %d\n", rm.ObserverCount())
}
Output: Empty resource: producers: 0 consumers: 0 observers: 0 After add resources: producers: 2 consumers: 3 observers: 2 Close or Unsubscribe resources: resource producer:2 released resource consumer:2 released resource consumer:3 unsubscribed and released resource observer:2 released After releases: producers: 1 consumers: 1 observers: 1 Close resource manager: resource consumer:1 released resource producer:1 released resource observer:1 released producers: 0 consumers: 0 observers: 0
func NewResourceManager ¶ added in v0.3.0
func NewResourceManager() ResourceManager
type SubCallback ¶ added in v0.3.0
type SubCallback[CM any] func(Acknowledger[CM], CM, error)
type Suite ¶ added in v0.3.1
type Suite[PM any, CM any] interface { // Topic returns the topic name bound to this producer. Topic() string // Publish publishes payload to the given topic. Publish(ctx context.Context, topic string, payload []byte) (PM, error) // PublishMessage publishes message PublishMessage(context.Context, PM) error Run(ctx context.Context, h SubHandler[CM]) error Acknowledger[CM] Close() error }
type Unsubscriber ¶ added in v0.3.0
type Unsubscriber interface {
Unsubscribe() error
}
Unsubscriber release subscription resource at broker end.