Documentation
¶
Overview ¶
Package bus provides an interface for core publish-subscribe messaging.
It is designed to be a simple, high-level abstraction over a message broker, offering common patterns like fire-and-forget publishing, streaming subscriptions, and request-reply.
Basic Usage:
// Configuration (replace with your actual values)
cfg := &bus.Config{
NATSURL: "nats://127.0.0.1:4222",
}
// Create a new Messenger instance
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messenger, err := bus.NewPubSub(ctx, cfg)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer messenger.Close()
// --- Publish a message ---
err = messenger.Publish(context.Background(), "updates.topic", []byte("hello world"))
if err != nil {
log.Printf("Publish failed: %v", err)
}
// --- Stream messages ---
msgChan := make(chan []byte, 64)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()
sub, err := messenger.Stream(streamCtx, "updates.topic", msgChan)
if err != nil {
log.Fatalf("Stream failed: %v", err)
}
defer sub.Unsubscribe()
// --- Serve requests ---
handler := func(ctx context.Context, data []byte) ([]byte, error) {
log.Printf("Handler received: %s", string(data))
return []byte("ack"), nil
}
serveCtx, serveCancel := context.WithCancel(context.Background())
defer serveCancel()
serveSub, err := messenger.Serve(serveCtx, "service.topic", handler)
if err != nil {
log.Fatalf("Serve failed: %v", err)
}
defer serveSub.Unsubscribe()
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectionClosed is returned when an operation is attempted on a closed connection. ErrConnectionClosed = errors.New("connection closed") // ErrStreamSubscriptionFail is returned when a stream subscription fails. ErrStreamSubscriptionFail = errors.New("stream subscription failed") // ErrMessagePublish is returned when publishing a message fails for reasons other than a closed connection. ErrMessagePublish = errors.New("message publishing failed") // ErrRequestTimeout is returned when a request-reply operation times out. ErrRequestTimeout = errors.New("request timed out") )
Functions ¶
func SetupNatsInstance ¶
Types ¶
type Handler ¶ added in v0.0.3
Handler is a function that processes a request and returns a response. It is used by the Serve method to handle incoming requests.
type Messenger ¶
type Messenger interface {
// Publish sends a fire-and-forget message to a given subject.
Publish(ctx context.Context, subject string, data []byte) error
// Stream creates a subscription to a subject and delivers messages asynchronously
// to the provided channel. The subscription is automatically managed and will
// be closed when the provided context is canceled.
Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)
// Request sends a request message and waits for a reply. The context can be
// used to set a timeout or to cancel the request.
Request(ctx context.Context, subject string, data []byte) ([]byte, error)
// Serve registers a handler for a given subject to respond to requests.
// It starts a worker that listens for requests and executes the handler.
// The returned Subscription can be used to stop serving.
Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)
// Close disconnects from the messaging server and cleans up any underlying resources.
Close() error
}
Messenger defines a high-level interface for various messaging patterns. It is designed for real-time event notifications, triggering ephemeral tasks, and distributing lightweight messages between services.
func NewTestPubSub ¶
NewTestPubSub starts a NATS container using SetupNatsInstance, creates a new PubSub instance, and returns it along with a cleanup function.
type Subscription ¶
type Subscription interface {
// Unsubscribe removes the subscription, stopping the delivery of messages.
Unsubscribe() error
}
Subscription represents an active subscription to a subject.