Documentation
¶
Overview ¶
Package pubsubmutex implements a thread-safe, in-memory, multi-topic publish-subscribe system that provides type safety for each topic.
This package allows you to create a single pub/sub instance that can handle different data types on different topics. It ensures that you can only publish or subscribe to a topic with the data type it was registered with.
Key Features ¶
Type-Safe Topics: Register each topic with a specific Go type. The generic `Publish` and `Subscribe` functions then check this type at runtime, returning an error if there is a mismatch.
Centralized Management: Use a single `PubSub` instance to manage multiple, type-safe topic systems.
Thread Safety: All operations are safe for concurrent use by multiple goroutines.
Reference Counting & Cleanup: Message data can implement the `ManagedItem[T]` interface. The system will automatically call `Ref()` when creating new references for subscribers and `Cleanup()` when a message reference is dropped, preventing resource leaks.
Configurable Delivery: Configure topics to either drop messages or block with a timeout if a subscriber's buffer is full. This can be done at registration time or later via `UpdateTopic`.
Subscriber Self-Cleanup: Subscriber instances can unsubscribe themselves via the `Unsubscribe()` method.
Usage Examples ¶
Here are some examples demonstrating how to use the type-safe API.
Initialization and Topic Registration ¶
First, create a `PubSub` instance. Before you can use a topic, you must register it with the specific data type it will carry. You can optionally provide a configuration at the same time.
// Create a new PubSub system.
ps := pubsubmutex.NewPubSub()
defer ps.Close() // Best practice to defer Close().
// Define the types for your topics.
type UserUpdate struct{ UserID int; NewEmail string }
type OrderEvent struct{ OrderID string; Status string }
// Register a topic with default settings.
pubsubmutex.RegisterTopic[UserUpdate](ps, "user.updates")
// Register another topic and configure it to drop messages if buffers are full.
pubsubmutex.RegisterTopic[OrderEvent](ps, "order.events", pubsubmutex.TopicConfig{AllowDropping: true})
Type-Safe Subscribing and Publishing ¶
Once a topic is registered, you can use the generic `Subscribe` and `Publish` functions. They will return an error at runtime if you use the wrong type.
// Assumes 'ps' is created and topics are registered from the previous example.
var wg sync.WaitGroup
// Subscribe to the "user.updates" topic, getting a type-safe subscriber.
userSub, err := pubsubmutex.Subscribe[UserUpdate](ps, "user.updates", "user-service", 10)
if err != nil {
// Handle error
}
// This call will fail at runtime and return an error because the topic
// "user.updates" is registered for UserUpdate, not OrderEvent.
orderSub, err := pubsubmutex.Subscribe[OrderEvent](ps, "user.updates", "order-service", 10)
if err != nil {
fmt.Println("Correctly caught type mismatch error:", err)
}
wg.Add(1)
go func() {
defer wg.Done()
for msg := range userSub.Ch { // msg is of type pubsubmutex.Message[UserUpdate]
fmt.Printf("User update received: ID=%d, NewEmail=%s\n", msg.Data.UserID, msg.Data.NewEmail)
}
}()
// Publishing is also checked at runtime.
updateMsg := pubsubmutex.Message[UserUpdate]{
Topic: "user.updates",
Data: UserUpdate{UserID: 123, NewEmail: "new.email@example.com"},
}
err = pubsubmutex.Publish(ps, updateMsg)
if err != nil {
// Handle error
}
// This publish will fail at runtime and return an error.
errorMsg := pubsubmutex.Message[OrderEvent]{
Topic: "user.updates",
Data: OrderEvent{OrderID: "xyz"},
}
err = pubsubmutex.Publish(ps, errorMsg)
if err != nil {
fmt.Println("Correctly caught publish type mismatch error:", err)
}
Using ManagedItem for Resource Cleanup ¶
If your message data holds a resource (like a file handle or pointer), you can implement the `ManagedItem[T]` interface to manage its lifecycle.
// Define a type that holds a resource and a reference count.
type ResourcefulMessage struct {
ID int
refCount int32
}
// Implement the ManagedItem interface.
func (rm *ResourcefulMessage) Ref() *ResourcefulMessage {
atomic.AddInt32(&rm.refCount, 1)
return rm
}
func (rm *ResourcefulMessage) Cleanup() {
if atomic.AddInt32(&rm.refCount, -1) == 0 {
fmt.Printf("Final cleanup for resource ID: %d\n", rm.ID)
// Here you would close the file handle, etc.
}
}
// ... later in your code ...
topic := "resource.topic"
pubsubmutex.RegisterTopic[*ResourcefulMessage](ps, topic)
// Publish to a topic with no subscribers. The message will be dropped,
// and its Cleanup() method will be called automatically.
resourceMsg := pubsubmutex.Message[*ResourcefulMessage]{
Topic: topic,
Data: &ResourcefulMessage{ID: 1, refCount: 1},
}
pubsubmutex.Publish(ps, resourceMsg)
// Output will include: Final cleanup for resource ID: 1
Index ¶
- Constants
- func Publish[T any](ps *PubSub, message Message[T]) error
- func RegisterTopic[T any](ps *PubSub, topic string, config ...TopicConfig) error
- func SendReceive[ReqT, ResT any](ps *PubSub, sendTopic string, receiveTopic string, requestData ReqT, ...) (ResT, bool)
- func SetDebug(enable bool)
- type ManagedItem
- type Message
- type PubSub
- type Subscriber
- type TopicConfig
Constants ¶
const DefaultPublishTimeout = 500 * time.Millisecond
DefaultPublishTimeout is the default duration a publisher will wait.
Variables ¶
This section is empty.
Functions ¶
func Publish ¶ added in v1.11.0
Publish provides a type-safe way to publish a message. It ensures the message's type matches the registered type for the topic.
func RegisterTopic ¶ added in v1.11.0
func RegisterTopic[T any](ps *PubSub, topic string, config ...TopicConfig) error
RegisterTopic associates a topic string with a specific data type. You must register a topic's type before publishing or subscribing to it. An optional TopicConfig can be provided to set behaviors like message dropping or timeouts.
func SendReceive ¶ added in v1.11.0
func SendReceive[ReqT, ResT any]( ps *PubSub, sendTopic string, receiveTopic string, requestData ReqT, timeoutMs int, ) (ResT, bool)
SendReceive provides a type-safe implementation of a request-response pattern. It subscribes to a 'receiveTopic' expecting a response of type ResT, publishes a request of type ReqT to a 'sendTopic', and waits for a response. It returns the response and true if successful, or the zero value of ResT and false on timeout. Note: Both the send and receive topics must be registered with their respective types beforehand.
Types ¶
type ManagedItem ¶ added in v1.11.0
type ManagedItem[T any] interface { Ref() T Cleanup() }
ManagedItem is an optional generic interface for message data that requires reference counting and explicit cleanup. The generic type T ensures that Ref() returns the concrete type of the data.
type Message ¶ added in v1.1.0
Message represents a message to be published. It is generic over the type of its data payload.
type PubSub ¶ added in v1.1.0
type PubSub struct {
// contains filtered or unexported fields
}
PubSub provides a type-safe, multi-topic publish-subscribe system. It manages topic-to-type registration and ensures that publish and subscribe operations are type-checked.
func NewPubSub ¶ added in v1.1.0
func NewPubSub() *PubSub
NewPubSub creates a new type-safe pub/sub system. This is the main entry point for creating a new pub/sub instance.
func (*PubSub) Close ¶ added in v1.1.0
func (ps *PubSub) Close()
Close gracefully shuts down the underlying PubSub system.
func (*PubSub) GetUniqueSubscriberID ¶ added in v1.1.0
GetUniqueSubscriberID generates a new unique subscriber ID.
func (*PubSub) UpdateTopic ¶ added in v1.11.0
func (ps *PubSub) UpdateTopic(topic string, config TopicConfig)
UpdateTopic changes or sets the configuration for an already registered topic. This is useful for changing behaviors like message dropping or timeouts at runtime.
type Subscriber ¶ added in v1.1.0
type Subscriber[T any] struct { ID string Topic string Ch chan Message[T] // contains filtered or unexported fields }
Subscriber represents a client subscribed to a topic. It is generic over the type of data it receives.
func Subscribe ¶ added in v1.11.0
func Subscribe[T any](ps *PubSub, topic string, subscriberID string, bufferSize int) (*Subscriber[T], error)
Subscribe provides a type-safe way to subscribe to a topic. It returns a strongly-typed Subscriber[T] if the topic has been registered with type T.
func (*Subscriber[T]) ReadMessages ¶ added in v1.1.0
func (s *Subscriber[T]) ReadMessages(handler func(Message[T]))
ReadMessages is a helper function for clients to continuously read messages.
func (*Subscriber[T]) Unsubscribe ¶ added in v1.3.0
func (s *Subscriber[T]) Unsubscribe()
Unsubscribe signals the PubSub system to remove and clean up this subscriber instance.
type TopicConfig ¶ added in v1.1.0
TopicConfig allows configuring behavior for a specific topic.