Documentation
¶
Index ¶
- Constants
- Variables
- func IsBackendAvailable(name Backend) bool
- func RegisterBackend(name Backend, factory Factory)
- type Backend
- type Config
- type Factory
- type Handler
- type HandlerMiddleware
- type LogFunc
- type Metrics
- type OverflowPolicy
- type PubSub
- type Publisher
- type PublisherMiddleware
- type RedisClient
- type RedisOptions
- type Subscriber
- type SubscriberMiddleware
- type Subscription
- type TypedHandler
- type TypedPubSub
- type TypedPublisher
- type TypedSubscriber
- type TypedSubscription
Constants ¶
const (
// DefaultBatchSizeMax is the default maximum batch size for pipelining
DefaultBatchSizeMax = 1000
)
Variables ¶
var ( // ErrPubSubClosed is returned when operating on a closed PubSub instance ErrPubSubClosed = errors.New("pubsub: closed") // ErrUnknownBackend is returned when the specified backend is not registered ErrUnknownBackend = errors.New("pubsub: unknown backend") // ErrInvalidConfig is returned when the configuration is invalid ErrInvalidConfig = errors.New("pubsub: invalid config") // ErrSubscriptionClosed is returned when operating on a closed subscription ErrSubscriptionClosed = errors.New("pubsub: subscription closed") // ErrTopicEmpty is returned when topic is empty ErrTopicEmpty = errors.New("pubsub: topic cannot be empty") // ErrNilMessage is returned when message is nil ErrNilMessage = errors.New("pubsub: message cannot be nil") // ErrNilHandler is returned when handler is nil ErrNilHandler = errors.New("pubsub: handler cannot be nil") )
Functions ¶
func IsBackendAvailable ¶
IsBackendAvailable checks if a backend is registered.
func RegisterBackend ¶
RegisterBackend registers a backend factory. This should be called during init() by backend implementations. Registering the same backend twice will panic.
Types ¶
type Backend ¶
type Backend string
Backend identifies the pubsub backend type
func AvailableBackends ¶
func AvailableBackends() []Backend
AvailableBackends returns a list of all registered backend names.
type Config ¶
type Config struct {
// Backend specifies which backend to use
Backend Backend
// BufferSize is the channel buffer size for subscribers (default: 100)
BufferSize int
// OnFull specifies behavior when subscriber buffer is full
OnFull OverflowPolicy
// BatchSizeMax is the maximum number of messages per pipeline batch (default: 1000)
// Only applicable to backends that support pipelining (e.g., Redis)
BatchSizeMax int
// Options contains backend-specific configuration
Options map[string]any
}
Config configures a PubSub instance
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults
type Handler ¶
Handler processes messages from a subscription
func ChainHandler ¶
func ChainHandler(h Handler, mws ...HandlerMiddleware) Handler
ChainHandler chains multiple middleware around a Handler Middleware is applied in the order provided (first middleware is outermost)
type HandlerMiddleware ¶
HandlerMiddleware wraps a Handler with additional functionality
func WithHandlerTimeout ¶
func WithHandlerTimeout(timeout time.Duration) HandlerMiddleware
WithHandlerTimeout returns a HandlerMiddleware that adds a timeout to handler execution
func WithLogging ¶
func WithLogging(logFn LogFunc) HandlerMiddleware
func WithRecovery ¶
func WithRecovery(onPanic func(recovered any)) HandlerMiddleware
WithRecovery returns a HandlerMiddleware that recovers from panics
type Metrics ¶
type Metrics struct {
// PublishTotal counts total publish operations per topic
PublishTotal *prometheus.CounterVec
// DeliveredTotal counts total messages delivered to subscribers per topic
DeliveredTotal *prometheus.CounterVec
// DroppedTotal counts messages dropped due to full buffers per topic
DroppedTotal *prometheus.CounterVec
// SubscribeTotal counts total subscribe operations per topic
SubscribeTotal *prometheus.CounterVec
// UnsubscribeTotal counts total unsubscribe operations per topic
UnsubscribeTotal *prometheus.CounterVec
// SubscribersGauge tracks current number of subscribers per topic
SubscribersGauge *prometheus.GaugeVec
// HandlerErrorTotal counts handler errors per topic
HandlerErrorTotal *prometheus.CounterVec
// PublishLatency tracks publish operation latency per topic
PublishLatency *prometheus.HistogramVec
// BatchSize tracks batch sizes for PublishBatch operations
BatchSize *prometheus.HistogramVec
// PipelineLatency tracks pipeline execution latency
PipelineLatency *prometheus.HistogramVec
// PipelineErrorTotal counts pipeline execution errors
PipelineErrorTotal *prometheus.CounterVec
}
Metrics contains all Prometheus metrics for a PubSub instance
func NewMetrics ¶
NewMetrics creates or returns the existing Metrics instance for the given backend. Metrics are registered only once per backend to avoid duplicate registration panics.
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy defines behavior when subscriber buffer is full
const ( // OverflowDrop drops the message when buffer is full (lossy, non-blocking) OverflowDrop OverflowPolicy = iota // OverflowBlock blocks until buffer has space (lossless, may slow publisher) OverflowBlock )
type PubSub ¶
type PubSub interface {
Publisher
Subscriber
// Topics returns a list of all topics with active subscriptions
Topics() []string
// SubscriberCount returns the number of active subscribers for a topic
SubscriberCount(topic string) int
// Close gracefully shuts down the PubSub instance.
// All subscriptions are closed and no more messages can be published.
Close() error
}
PubSub combines Publisher and Subscriber interfaces
func MustNewPubSub ¶
MustNewPubSub is like NewPubSub but panics on error. Use this only in initialization code where failure is unrecoverable.
type Publisher ¶
type Publisher interface {
// Publish sends a message to all subscribers of the topic.
// Returns nil if there are no subscribers (message is silently dropped).
Publish(ctx context.Context, topic string, msg message.Message[any]) error
// PublishBatch sends multiple messages to all subscribers of the topic.
// All messages are sent atomically if the backend supports it.
PublishBatch(ctx context.Context, topic string, msgs []message.Message[any]) error
}
Publisher publishes messages to topics
func ChainPublisher ¶
func ChainPublisher(p Publisher, mws ...PublisherMiddleware) Publisher
ChainPublisher chains multiple middleware around a Publisher Middleware is applied in the order provided (first middleware is outermost)
type PublisherMiddleware ¶
PublisherMiddleware wraps a Publisher with additional functionality
func WithRetry ¶
func WithRetry(maxRetries int, backoff time.Duration) PublisherMiddleware
WithRetry returns a PublisherMiddleware that retries failed publishes
func WithTimeout ¶
func WithTimeout(timeout time.Duration) PublisherMiddleware
WithTimeout returns a PublisherMiddleware that adds a timeout to publish operations
type RedisClient ¶
type RedisClient interface {
redis.Cmdable
Subscribe(ctx context.Context, channels ...string) *redis.PubSub
PSubscribe(ctx context.Context, channels ...string) *redis.PubSub
}
RedisClient is an interface that supports both publishing and subscribing
type RedisOptions ¶
type RedisOptions struct {
// Addr is the Redis server address (default: "localhost:6379")
Addr string
// Password is the Redis password (optional)
Password string
// DB is the Redis database number (default: 0)
DB int
// Prefix is the key prefix for pubsub channels (default: "pubsub:")
Prefix string
// PoolSize is the maximum number of socket connections (default: 10)
PoolSize int
// ReadTimeout is the timeout for reading from Redis (default: 3s)
ReadTimeout time.Duration
// WriteTimeout is the timeout for writing to Redis (default: 3s)
WriteTimeout time.Duration
}
RedisOptions contains Redis-specific configuration
func DefaultRedisOptions ¶
func DefaultRedisOptions() RedisOptions
DefaultRedisOptions returns RedisOptions with sensible defaults
func RedisOptionsFromMap ¶
func RedisOptionsFromMap(m map[string]any) RedisOptions
RedisOptionsFromMap extracts RedisOptions from a map
func (RedisOptions) ToMap ¶
func (o RedisOptions) ToMap() map[string]any
ToMap converts RedisOptions to a map for Config.Options
type Subscriber ¶
type Subscriber interface {
// Subscribe returns a channel that receives all messages published to the topic.
// The channel is closed when:
// - The context is cancelled
// - Unsubscribe is called on the returned Subscription
// - The PubSub instance is closed
//
// Each call to Subscribe creates a new independent subscription.
// All subscriptions receive all messages (fan-out).
Subscribe(ctx context.Context, topic string) (Subscription, error)
// SubscribeWithHandler registers a handler for the topic.
// The handler is called for each message received.
// This is a blocking call that returns when:
// - The context is cancelled
// - The PubSub instance is closed
// - An unrecoverable error occurs
//
// Handler errors are logged but do not stop the subscription.
SubscribeWithHandler(ctx context.Context, topic string, handler Handler) error
}
Subscriber subscribes to topics and receives messages
type SubscriberMiddleware ¶
type SubscriberMiddleware func(Subscriber) Subscriber
SubscriberMiddleware wraps a Subscriber with additional functionality
type Subscription ¶
type Subscription interface {
// ID returns the unique identifier of this subscription
ID() string
// Topic returns the topic this subscription is listening to
Topic() string
// Messages returns the channel that receives messages
// The channel is closed when the subscription ends
Messages() <-chan message.Message[any]
// Unsubscribe cancels this subscription and closes the message channel
Unsubscribe() error
}
Subscription represents an active subscription to a topic
type TypedHandler ¶
TypedHandler processes typed messages
type TypedPubSub ¶
type TypedPubSub[T any] interface { TypedPublisher[T] TypedSubscriber[T] Topics() []string SubscriberCount(topic string) int Close() error }
TypedPubSub combines typed Publisher and Subscriber
type TypedPublisher ¶
type TypedPublisher[T any] interface { // Publish sends a typed message to all subscribers Publish(ctx context.Context, topic string, data T) error // PublishBatch sends multiple typed messages PublishBatch(ctx context.Context, topic string, data []T) error }
TypedPublisher provides type-safe publishing
type TypedSubscriber ¶
type TypedSubscriber[T any] interface { // Subscribe returns a channel that receives typed messages Subscribe(ctx context.Context, topic string) (TypedSubscription[T], error) // SubscribeWithHandler registers a typed handler SubscribeWithHandler(ctx context.Context, topic string, handler TypedHandler[T]) error }
TypedSubscriber provides type-safe subscription