pubsub

package
v0.12.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultBatchSizeMax is the default maximum batch size for pipelining
	DefaultBatchSizeMax = 1000
)

Variables

View Source
var (
	// ErrPubSubClosed is returned when operating on a closed PubSub instance
	ErrPubSubClosed = errors.New("pubsub: closed")

	// ErrUnknownBackend is returned when the specified backend is not registered
	ErrUnknownBackend = errors.New("pubsub: unknown backend")

	// ErrInvalidConfig is returned when the configuration is invalid
	ErrInvalidConfig = errors.New("pubsub: invalid config")

	// ErrSubscriptionClosed is returned when operating on a closed subscription
	ErrSubscriptionClosed = errors.New("pubsub: subscription closed")

	// ErrTopicEmpty is returned when topic is empty
	ErrTopicEmpty = errors.New("pubsub: topic cannot be empty")

	// ErrNilMessage is returned when message is nil
	ErrNilMessage = errors.New("pubsub: message cannot be nil")

	// ErrNilHandler is returned when handler is nil
	ErrNilHandler = errors.New("pubsub: handler cannot be nil")
)

Functions

func IsBackendAvailable

func IsBackendAvailable(name Backend) bool

IsBackendAvailable checks if a backend is registered.

func RegisterBackend

func RegisterBackend(name Backend, factory Factory)

RegisterBackend registers a backend factory. This should be called during init() by backend implementations. Registering the same backend twice will panic.

Types

type Backend

type Backend string

Backend identifies the pubsub backend type

const (
	// BackendMemory uses in-memory channels for pub/sub (for testing/development)
	BackendMemory Backend = "memory"

	// BackendRedis uses Redis Pub/Sub for distributed messaging
	BackendRedis Backend = "redis"
)

func AvailableBackends

func AvailableBackends() []Backend

AvailableBackends returns a list of all registered backend names.

type Config

type Config struct {
	// Backend specifies which backend to use
	Backend Backend

	// BufferSize is the channel buffer size for subscribers (default: 100)
	BufferSize int

	// OnFull specifies behavior when subscriber buffer is full
	OnFull OverflowPolicy

	// BatchSizeMax is the maximum number of messages per pipeline batch (default: 1000)
	// Only applicable to backends that support pipelining (e.g., Redis)
	BatchSizeMax int

	// Options contains backend-specific configuration
	Options map[string]any
}

Config configures a PubSub instance

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults

type Factory

type Factory func(cfg Config) (PubSub, error)

Factory creates PubSub instances for a specific backend

type Handler

type Handler func(ctx context.Context, msg message.Message[any]) error

Handler processes messages from a subscription

func ChainHandler

func ChainHandler(h Handler, mws ...HandlerMiddleware) Handler

ChainHandler chains multiple middleware around a Handler Middleware is applied in the order provided (first middleware is outermost)

type HandlerMiddleware

type HandlerMiddleware func(Handler) Handler

HandlerMiddleware wraps a Handler with additional functionality

func WithHandlerTimeout

func WithHandlerTimeout(timeout time.Duration) HandlerMiddleware

WithHandlerTimeout returns a HandlerMiddleware that adds a timeout to handler execution

func WithLogging

func WithLogging(logFn LogFunc) HandlerMiddleware

func WithRecovery

func WithRecovery(onPanic func(recovered any)) HandlerMiddleware

WithRecovery returns a HandlerMiddleware that recovers from panics

type LogFunc

type LogFunc func(format string, args ...any)

WithLogging returns a HandlerMiddleware that logs message processing

type Metrics

type Metrics struct {
	// PublishTotal counts total publish operations per topic
	PublishTotal *prometheus.CounterVec

	// DeliveredTotal counts total messages delivered to subscribers per topic
	DeliveredTotal *prometheus.CounterVec

	// DroppedTotal counts messages dropped due to full buffers per topic
	DroppedTotal *prometheus.CounterVec

	// SubscribeTotal counts total subscribe operations per topic
	SubscribeTotal *prometheus.CounterVec

	// UnsubscribeTotal counts total unsubscribe operations per topic
	UnsubscribeTotal *prometheus.CounterVec

	// SubscribersGauge tracks current number of subscribers per topic
	SubscribersGauge *prometheus.GaugeVec

	// HandlerErrorTotal counts handler errors per topic
	HandlerErrorTotal *prometheus.CounterVec

	// PublishLatency tracks publish operation latency per topic
	PublishLatency *prometheus.HistogramVec

	// BatchSize tracks batch sizes for PublishBatch operations
	BatchSize *prometheus.HistogramVec

	// PipelineLatency tracks pipeline execution latency
	PipelineLatency *prometheus.HistogramVec

	// PipelineErrorTotal counts pipeline execution errors
	PipelineErrorTotal *prometheus.CounterVec
}

Metrics contains all Prometheus metrics for a PubSub instance

func NewMetrics

func NewMetrics(backend string) *Metrics

NewMetrics creates or returns the existing Metrics instance for the given backend. Metrics are registered only once per backend to avoid duplicate registration panics.

type OverflowPolicy

type OverflowPolicy int

OverflowPolicy defines behavior when subscriber buffer is full

const (
	// OverflowDrop drops the message when buffer is full (lossy, non-blocking)
	OverflowDrop OverflowPolicy = iota

	// OverflowBlock blocks until buffer has space (lossless, may slow publisher)
	OverflowBlock
)

type PubSub

type PubSub interface {
	Publisher
	Subscriber

	// Topics returns a list of all topics with active subscriptions
	Topics() []string

	// SubscriberCount returns the number of active subscribers for a topic
	SubscriberCount(topic string) int

	// Close gracefully shuts down the PubSub instance.
	// All subscriptions are closed and no more messages can be published.
	Close() error
}

PubSub combines Publisher and Subscriber interfaces

func MustNewPubSub

func MustNewPubSub(cfg Config) PubSub

MustNewPubSub is like NewPubSub but panics on error. Use this only in initialization code where failure is unrecoverable.

func NewPubSub

func NewPubSub(cfg Config) (PubSub, error)

NewPubSub creates a new PubSub instance using the configured backend. Returns ErrUnknownBackend if the backend is not registered.

type Publisher

type Publisher interface {
	// Publish sends a message to all subscribers of the topic.
	// Returns nil if there are no subscribers (message is silently dropped).
	Publish(ctx context.Context, topic string, msg message.Message[any]) error

	// PublishBatch sends multiple messages to all subscribers of the topic.
	// All messages are sent atomically if the backend supports it.
	PublishBatch(ctx context.Context, topic string, msgs []message.Message[any]) error
}

Publisher publishes messages to topics

func ChainPublisher

func ChainPublisher(p Publisher, mws ...PublisherMiddleware) Publisher

ChainPublisher chains multiple middleware around a Publisher Middleware is applied in the order provided (first middleware is outermost)

type PublisherMiddleware

type PublisherMiddleware func(Publisher) Publisher

PublisherMiddleware wraps a Publisher with additional functionality

func WithRetry

func WithRetry(maxRetries int, backoff time.Duration) PublisherMiddleware

WithRetry returns a PublisherMiddleware that retries failed publishes

func WithTimeout

func WithTimeout(timeout time.Duration) PublisherMiddleware

WithTimeout returns a PublisherMiddleware that adds a timeout to publish operations

type RedisClient

type RedisClient interface {
	redis.Cmdable
	Subscribe(ctx context.Context, channels ...string) *redis.PubSub
	PSubscribe(ctx context.Context, channels ...string) *redis.PubSub
}

RedisClient is an interface that supports both publishing and subscribing

type RedisOptions

type RedisOptions struct {
	// Addr is the Redis server address (default: "localhost:6379")
	Addr string

	// Password is the Redis password (optional)
	Password string

	// DB is the Redis database number (default: 0)
	DB int

	// Prefix is the key prefix for pubsub channels (default: "pubsub:")
	Prefix string

	// PoolSize is the maximum number of socket connections (default: 10)
	PoolSize int

	// ReadTimeout is the timeout for reading from Redis (default: 3s)
	ReadTimeout time.Duration

	// WriteTimeout is the timeout for writing to Redis (default: 3s)
	WriteTimeout time.Duration
}

RedisOptions contains Redis-specific configuration

func DefaultRedisOptions

func DefaultRedisOptions() RedisOptions

DefaultRedisOptions returns RedisOptions with sensible defaults

func RedisOptionsFromMap

func RedisOptionsFromMap(m map[string]any) RedisOptions

RedisOptionsFromMap extracts RedisOptions from a map

func (RedisOptions) ToMap

func (o RedisOptions) ToMap() map[string]any

ToMap converts RedisOptions to a map for Config.Options

type Subscriber

type Subscriber interface {
	// Subscribe returns a channel that receives all messages published to the topic.
	// The channel is closed when:
	// - The context is cancelled
	// - Unsubscribe is called on the returned Subscription
	// - The PubSub instance is closed
	//
	// Each call to Subscribe creates a new independent subscription.
	// All subscriptions receive all messages (fan-out).
	Subscribe(ctx context.Context, topic string) (Subscription, error)

	// SubscribeWithHandler registers a handler for the topic.
	// The handler is called for each message received.
	// This is a blocking call that returns when:
	// - The context is cancelled
	// - The PubSub instance is closed
	// - An unrecoverable error occurs
	//
	// Handler errors are logged but do not stop the subscription.
	SubscribeWithHandler(ctx context.Context, topic string, handler Handler) error
}

Subscriber subscribes to topics and receives messages

type SubscriberMiddleware

type SubscriberMiddleware func(Subscriber) Subscriber

SubscriberMiddleware wraps a Subscriber with additional functionality

type Subscription

type Subscription interface {
	// ID returns the unique identifier of this subscription
	ID() string

	// Topic returns the topic this subscription is listening to
	Topic() string

	// Messages returns the channel that receives messages
	// The channel is closed when the subscription ends
	Messages() <-chan message.Message[any]

	// Unsubscribe cancels this subscription and closes the message channel
	Unsubscribe() error
}

Subscription represents an active subscription to a topic

type TypedHandler

type TypedHandler[T any] func(ctx context.Context, msg message.Message[T]) error

TypedHandler processes typed messages

type TypedPubSub

type TypedPubSub[T any] interface {
	TypedPublisher[T]
	TypedSubscriber[T]
	Topics() []string
	SubscriberCount(topic string) int
	Close() error
}

TypedPubSub combines typed Publisher and Subscriber

type TypedPublisher

type TypedPublisher[T any] interface {
	// Publish sends a typed message to all subscribers
	Publish(ctx context.Context, topic string, data T) error

	// PublishBatch sends multiple typed messages
	PublishBatch(ctx context.Context, topic string, data []T) error
}

TypedPublisher provides type-safe publishing

type TypedSubscriber

type TypedSubscriber[T any] interface {
	// Subscribe returns a channel that receives typed messages
	Subscribe(ctx context.Context, topic string) (TypedSubscription[T], error)

	// SubscribeWithHandler registers a typed handler
	SubscribeWithHandler(ctx context.Context, topic string, handler TypedHandler[T]) error
}

TypedSubscriber provides type-safe subscription

type TypedSubscription

type TypedSubscription[T any] interface {
	ID() string
	Topic() string
	Messages() <-chan message.Message[T]
	Unsubscribe() error
}

TypedSubscription provides type-safe message access

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL