mq

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package mq provides a unified abstraction for message queue components.

Key components:

  • message.go: Defines message attributes (Topic, Payload, Extra, Tag, etc.) enabling different MQ drivers to implement specific features via composition.
  • pubsub.go: Defines standard roles including Consumer, Producer, Observer, and Factory.
  • resource.go: Provides universal resource management, allowing resources created by the Factory to be managed through a centralized ResourceManager.

Code generated by genx:code DO NOT EDIT.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CRC added in v0.2.24

func CRC(k string) uint16

func Carry

func Carry[PM any, CM any](ps PubSub[PM, CM]) contextx.Carrier

Carry wraps PubSub as a contextx.Carrier for propagation across process or service boundaries.

func Fnv added in v0.2.24

func Fnv(k string) uint16

func With

func With[PM any, CM any](ctx context.Context, ps PubSub[PM, CM]) context.Context

With injects PubSub into context and returns the new context.

Types

type Acknowledger added in v0.3.0

type Acknowledger[M any] interface {
	Ack(M) error
	Nack(M) error
}

Acknowledger provides message acknowledgment and negative acknowledgment.

Used in SubHandler or SubCallback to explicitly control delivery semantics: Ack indicates successful processing; Nack indicates failure (typically triggers retry or dead-letter).

type AcknowledgerCanDiscard added in v0.3.1

type AcknowledgerCanDiscard[M any] interface {
	Acknowledger[M]

	// Discard discards message. deliveries message to DLQ directly when biz assures
	// message M cannot be handled, such as message cannot be unmarshalled
	Discard(M) error
}

type AsyncPubCallback added in v0.3.0

type AsyncPubCallback[PM any] func(PM, error)

type CanAppendExtra added in v0.3.0

type CanAppendExtra interface {
	AddExtra(k string, v string)
}

type CanAppendTags added in v0.3.0

type CanAppendTags interface {
	AddTags(...string)
}

type CanBeReleased added in v0.3.0

type CanBeReleased interface {
	// Release close and cleanup resource. unsub denotes if unsubscribes before close
	Release(...ReleaseOptionFunc) error
}

type CanRefreshConsumedAt added in v0.3.0

type CanRefreshConsumedAt interface {
	RefreshConsumedAt()
}

type CanRefreshPublishedAt added in v0.3.0

type CanRefreshPublishedAt interface {
	RefreshPublishedAt()
}

type CanSetBacklog added in v0.3.0

type CanSetBacklog interface {
	SetBacklog(int64)
}

type CanSetConsumedAt added in v0.3.0

type CanSetConsumedAt interface {
	SetConsumedAt(time.Time)
}

type CanSetDelay added in v0.3.0

type CanSetDelay interface {
	SetDelay(time.Duration)
	SetDeliveryAt(time.Time)
}

type CanSetExpiredAt added in v0.3.1

type CanSetExpiredAt interface {
	// SetExpiredAt use epoch second timestamp
	SetExpiredAt(int64)
	// SetExpiredAfter base now
	SetExpiredAfter(time.Duration)
}

type CanSetOffset added in v0.3.0

type CanSetOffset interface {
	SetOffset(int64)
}

type CanSetOrderingKey added in v0.3.0

type CanSetOrderingKey interface {
	SetOrderingKey(string)
}

type CanSetPartitionID added in v0.3.0

type CanSetPartitionID interface {
	SetPartitionID(uint64)
}

type CanSetPartitionKey added in v0.3.0

type CanSetPartitionKey interface {
	SetPartitionKey(string)
}

type CanSetPayload added in v0.3.0

type CanSetPayload interface {
	SetPayload([]byte)
}

type CanSetProducer added in v0.3.0

type CanSetProducer interface {
	SetProducer(string)
}

type CanSetPublishedAt added in v0.3.0

type CanSetPublishedAt interface {
	SetPublishedAt(time.Time)
}

type CanSetRetryCount added in v0.3.1

type CanSetRetryCount interface {
	AddRetryCount()
	SetRetryCount(uint32)
}

type CanSetSequenceID added in v0.3.0

type CanSetSequenceID interface {
	SetSequenceID(int64)
}

type CanSetTopic added in v0.3.0

type CanSetTopic interface {
	SetTopic(string)
}

type CanSetUnderlying added in v0.3.0

type CanSetUnderlying[T any] interface {
	SetUnderlying(T)
}

type ConsumeHandleMode added in v0.3.0

type ConsumeHandleMode int
const (
	// GlobalOrdered messages are processed strictly one by one in the order
	// they were received globally.
	GlobalOrdered ConsumeHandleMode = iota
	// PartitionOrdered messages with the same partition key are processed sequentially,
	// while messages with different keys are handled in parallel.
	PartitionOrdered
	// Concurrent messages are processed in parallel with no guarantee of ordering.
	// this mode offers the highest throughput.
	Concurrent
)

type Consumer added in v0.3.0

type Consumer[M any] interface {
	Acknowledger[M]
	Observer[M]
}

Consumer is the universal subscriber interface for message queues. the type parameter M denotes the consumed message type, typically implemented by different driver, eg: pulsar, rabbitmq, kafka etc.

type Error added in v0.3.0

type Error int8

Error presents error codes of message queue +genx:code

const (
	ERROR_UNDEFINED                Error = iota
	ERROR__CLI_CLOSED                    // client closed
	ERROR__CLI_INIT_ERROR                // client init failed
	ERROR__SUB_CLOSED                    // subscriber closed
	ERROR__SUB_BOOTED                    // subscriber is already booted
	ERROR__SUB_HANDLER_PANICKED          // subscriber handler panicked
	ERROR__SUB_PARSE_MESSAGE_ERROR       // subscriber failed to parse message
	ERROR__SUB_UNSUBSCRIBED              // subscriber unsubscribed
	ERROR__PUB_CLOSED                    // publisher closed
	ERROR__PUB_INVALID_MESSAGE           // publisher got invalid message
)

func (Error) Message added in v0.3.0

func (e Error) Message() string

type Factory added in v0.3.0

type Factory[M any] = Producer[M]

type HasBacklog added in v0.3.0

type HasBacklog interface {
	Backlog() int64
}

HasBacklog presents backlog messages when consuming

type HasBrokerLatency added in v0.3.0

type HasBrokerLatency interface {
	// BrokerLatency duration from broker published to logic subscribed
	BrokerLatency() time.Duration
}

type HasConsumedAt added in v0.3.0

type HasConsumedAt interface {
	ConsumedAt() time.Time
}

type HasDelay added in v0.3.0

type HasDelay interface {
	Delay() time.Duration
}

type HasExpiredAt added in v0.3.1

type HasExpiredAt interface {
	ExpiredAt() int64
}

HasExpiredAt helps consumer to check if message is expired to decide if consumer need to handle and effectiveness

type HasExtra added in v0.3.0

type HasExtra interface {
	// Extra is used to extend message info, eg: TraceID, RetryCount etc.
	Extra() map[string]string
	ExtraValueOf(string) (string, bool)
}

type HasLatency added in v0.3.0

type HasLatency interface {
	// Latency duration from event time(first published) to logic subscribed
	Latency() time.Duration
}

type HasOffset added in v0.3.0

type HasOffset interface {
	Offset() int64
}

type HasOrderingKey added in v0.3.0

type HasOrderingKey interface {
	OrderingKey() string
}

HasOrderingKey is used during message consumption to identify the biz key that governs processing order

type HasPartitionID added in v0.3.0

type HasPartitionID interface {
	PartitionID() int64
}

type HasPartitionKey added in v0.3.0

type HasPartitionKey interface {
	// PartitionKey biz key used for partition hashing
	PartitionKey() string
}

HasPartitionKey is used during message production to retrieve or specify the biz partition key for sharding.

type HasPayload added in v0.3.0

type HasPayload interface {
	Payload() []byte
}

type HasProducer added in v0.3.0

type HasProducer interface {
	ProducedBy() string
}

type HasPublishedAt added in v0.3.0

type HasPublishedAt interface {
	PublishedAt() time.Time
}

type HasRetryCount added in v0.3.0

type HasRetryCount interface {
	RetryCount() uint32
}

type HasSequenceID added in v0.3.0

type HasSequenceID interface {
	SequenceID() int64
}

type HasTags added in v0.3.0

type HasTags interface {
	Tags() []string
}

type HasTopic added in v0.3.0

type HasTopic interface {
	Topic() string
}

type HasUnderlying added in v0.3.0

type HasUnderlying[T any] interface {
	Underlying() T
}

HasUnderlying returns or retrieves the raw underlying message value. this is typically used to access driver-specific message type provided by the MQ implementation (e.g. pulsar.Message, kafka.Message, etc.).

type Hasher added in v0.2.24

type Hasher func(string) uint16

Hasher help to hash message.Key to dispatch message to task worker

type Observer added in v0.3.0

type Observer[M any] interface {
	// Run starts once consuming loop, processing each message with h.
	// it blocks until ctx canceled or consumer is resource closed and released.
	Run(ctx context.Context, h SubHandler[M]) error
	// Close closes the consumer and releases related resource
	Close() error
}

Observer is a passive consumer interface with no side effects on MQ state.

type Option

type Option interface {
	OptionScheme() string
}

type OptionApplier

type OptionApplier interface {
	Apply(Option)
}

type OptionApplyFunc

type OptionApplyFunc func(Option)

func (OptionApplyFunc) Apply

func (f OptionApplyFunc) Apply(opt Option)

type Producer added in v0.3.0

type Producer[M any] interface {
	// Topic returns the topic name bound to this producer.
	Topic() string
	// Publish publishes payload to the given topic.
	Publish(ctx context.Context, topic string, payload []byte) (M, error)
	// PublishWithKey publishes payload with partition key to the given topic,
	// for partition ordering or load balancing.
	PublishWithKey(ctx context.Context, topic string, key string, payload []byte) (M, error)
	// PublishMessage publishes message
	PublishMessage(context.Context, M) error
	// Close closes the producer and releases connections and related resources.
	Close() error
}

Producer is the universal producer interface for message queues.

type PubSub

type PubSub[PM any, CM any] interface {
	// NewProducer creates and returns a producer; topic and other options may be set via OptionApplier.
	NewProducer(context.Context, ...OptionApplier) (Producer[PM], error)
	// NewConsumer creates and returns a consumer; topic, subscription name, consume mode, etc. may be set via OptionApplier.
	NewConsumer(context.Context, ...OptionApplier) (Consumer[CM], error)
	// Close closes the pub/sub endpoint and releases underlying connection pools and resources.
	Close() error
}

PubSub defines the universal MQ client interface using a factory pattern for Producer and Consumer. PM is the produced message type, CM is the consumed message type instances are created via NewProducer, NewConsumer and configured with OptionApplier

func From

func From[PM any, CM any](ctx context.Context) (PubSub[PM, CM], bool)

From extracts the PubSub instance from context.

func Must

func Must[PM any, CM any](ctx context.Context) PubSub[PM, CM]

Must extracts the PubSub instance from context; panics if not present.

type ReleaseOption added in v0.3.0

type ReleaseOption struct {
	Unsub bool
}

type ReleaseOptionFunc added in v0.3.0

type ReleaseOptionFunc func(*ReleaseOption)

func WithUnsubscribe added in v0.3.0

func WithUnsubscribe() ReleaseOptionFunc

type Resource added in v0.3.0

type Resource interface {
	// Elem returns element in list when removing
	Elem() *list.Element
	// SetElem set element in list when inserting
	SetElem(*list.Element)
}

type ResourceManager added in v0.3.0

type ResourceManager interface {
	ConsumerCount() int
	ProducerCount() int
	ObserverCount() int

	AddConsumer(Resource)
	AddProducer(Resource)
	AddObserver(Resource)

	Unsubscribe(Resource) error
	CloseConsumer(Resource) error
	CloseProducer(Resource) error
	CloseObserver(Resource) error
	Close() error
}

ResourceManager maintains all Consumer, Producer and Observer created by PubSub

Example
package main

import (
	"container/list"
	"fmt"

	"github.com/xoctopus/confx/pkg/types/mq"
)

type MockResource struct {
	name string
	idx  int
	elem *list.Element
}

func (r *MockResource) Elem() *list.Element {
	return r.elem
}

func (r *MockResource) SetElem(e *list.Element) {
	r.elem = e
}

func (r *MockResource) Release(appliers ...mq.ReleaseOptionFunc) error {
	opt := &mq.ReleaseOption{}
	for _, applier := range appliers {
		applier(opt)
	}

	if !opt.Unsub {
		fmt.Printf("\tresource %s:%d released\n", r.name, r.idx)
	} else {
		fmt.Printf("\tresource %s:%d unsubscribed and released\n", r.name, r.idx)
	}
	return nil
}

func main() {
	rm := mq.NewResourceManager()

	defer func() {
		fmt.Println("Close resource manager:")
		_ = rm.Close()
		fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
		fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
		fmt.Printf("\tobservers: %d\n", rm.ObserverCount())
	}()

	fmt.Println("Empty resource:")
	fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
	fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
	fmt.Printf("\tobservers: %d\n", rm.ObserverCount())

	rm.AddProducer(&MockResource{name: "producer", idx: 1})
	p := &MockResource{name: "producer", idx: 2}
	rm.AddProducer(p)
	rm.AddConsumer(&MockResource{name: "consumer", idx: 1})
	c1 := &MockResource{name: "consumer", idx: 2}
	rm.AddConsumer(c1)
	c2 := &MockResource{name: "consumer", idx: 3}
	rm.AddConsumer(c2)
	rm.AddObserver(&MockResource{name: "observer", idx: 1})
	o := &MockResource{name: "observer", idx: 2}
	rm.AddObserver(o)

	fmt.Println("After add resources:")
	fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
	fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
	fmt.Printf("\tobservers: %d\n", rm.ObserverCount())

	fmt.Println("Close or Unsubscribe resources:")
	_ = rm.CloseProducer(p)
	_ = rm.CloseConsumer(c1)
	_ = rm.Unsubscribe(c2)
	_ = rm.CloseObserver(o)

	fmt.Println("After releases:")
	fmt.Printf("\tproducers: %d\n", rm.ProducerCount())
	fmt.Printf("\tconsumers: %d\n", rm.ConsumerCount())
	fmt.Printf("\tobservers: %d\n", rm.ObserverCount())

}
Output:
Empty resource:
	producers: 0
	consumers: 0
	observers: 0
After add resources:
	producers: 2
	consumers: 3
	observers: 2
Close or Unsubscribe resources:
	resource producer:2 released
	resource consumer:2 released
	resource consumer:3 unsubscribed and released
	resource observer:2 released
After releases:
	producers: 1
	consumers: 1
	observers: 1
Close resource manager:
	resource consumer:1 released
	resource producer:1 released
	resource observer:1 released
	producers: 0
	consumers: 0
	observers: 0

func NewResourceManager added in v0.3.0

func NewResourceManager() ResourceManager

type SubCallback added in v0.3.0

type SubCallback[CM any] func(Acknowledger[CM], CM, error)

type SubHandler added in v0.3.0

type SubHandler[CM any] func(context.Context, CM) error

type Suite added in v0.3.1

type Suite[PM any, CM any] interface {
	// Topic returns the topic name bound to this producer.
	Topic() string
	// Publish publishes payload to the given topic.
	Publish(ctx context.Context, topic string, payload []byte) (PM, error)
	// PublishMessage publishes message
	PublishMessage(context.Context, PM) error

	Run(ctx context.Context, h SubHandler[CM]) error
	Acknowledger[CM]

	Close() error
}

type Unsubscriber added in v0.3.0

type Unsubscriber interface {
	Unsubscribe() error
}

Unsubscriber release subscription resource at broker end.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL