bus

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: Apache-2.0 Imports: 8 Imported by: 2

README

bus

Documentation

Overview

Package bus provides an interface for core publish-subscribe messaging.

It is designed to be a simple, high-level abstraction over a message broker, offering common patterns like fire-and-forget publishing, streaming subscriptions, and request-reply.

Basic Usage:

// Configuration (replace with your actual values)
cfg := &bus.Config{
	NATSURL: "nats://127.0.0.1:4222",
}

// Create a new Messenger instance
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messenger, err := bus.NewPubSub(ctx, cfg)
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}
defer messenger.Close()

// --- Publish a message ---
err = messenger.Publish(context.Background(), "updates.topic", []byte("hello world"))
if err != nil {
	log.Printf("Publish failed: %v", err)
}

// --- Stream messages ---
msgChan := make(chan []byte, 64)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()

sub, err := messenger.Stream(streamCtx, "updates.topic", msgChan)
if err != nil {
	log.Fatalf("Stream failed: %v", err)
}
defer sub.Unsubscribe()

// --- Serve requests ---
handler := func(ctx context.Context, data []byte) ([]byte, error) {
	log.Printf("Handler received: %s", string(data))
	return []byte("ack"), nil
}
serveCtx, serveCancel := context.WithCancel(context.Background())
defer serveCancel()
serveSub, err := messenger.Serve(serveCtx, "service.topic", handler)
if err != nil {
	log.Fatalf("Serve failed: %v", err)
}
defer serveSub.Unsubscribe()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionClosed is returned when an operation is attempted on a closed connection.
	ErrConnectionClosed = errors.New("connection closed")
	// ErrStreamSubscriptionFail is returned when a stream subscription fails.
	ErrStreamSubscriptionFail = errors.New("stream subscription failed")
	// ErrMessagePublish is returned when publishing a message fails for reasons other than a closed connection.
	ErrMessagePublish = errors.New("message publishing failed")
	// ErrRequestTimeout is returned when a request-reply operation times out.
	ErrRequestTimeout = errors.New("request timed out")
)

Functions

func SetupNatsInstance

func SetupNatsInstance(ctx context.Context) (string, testcontainers.Container, func(), error)

Types

type Config

type Config struct {
	NATSURL      string
	NATSPassword string
	NATSUser     string
}

type Handler added in v0.0.3

type Handler func(ctx context.Context, data []byte) ([]byte, error)

Handler is a function that processes a request and returns a response. It is used by the Serve method to handle incoming requests.

type Messenger

type Messenger interface {
	// Publish sends a fire-and-forget message to a given subject.
	Publish(ctx context.Context, subject string, data []byte) error

	// Stream creates a subscription to a subject and delivers messages asynchronously
	// to the provided channel. The subscription is automatically managed and will
	// be closed when the provided context is canceled.
	Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)

	// Request sends a request message and waits for a reply. The context can be
	// used to set a timeout or to cancel the request.
	Request(ctx context.Context, subject string, data []byte) ([]byte, error)

	// Serve registers a handler for a given subject to respond to requests.
	// It starts a worker that listens for requests and executes the handler.
	// The returned Subscription can be used to stop serving.
	Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)

	// Close disconnects from the messaging server and cleans up any underlying resources.
	Close() error
}

Messenger defines a high-level interface for various messaging patterns. It is designed for real-time event notifications, triggering ephemeral tasks, and distributing lightweight messages between services.

func NewPubSub

func NewPubSub(ctx context.Context, cfg *Config) (Messenger, error)

func NewTestPubSub

func NewTestPubSub() (Messenger, func(), error)

NewTestPubSub starts a NATS container using SetupNatsInstance, creates a new PubSub instance, and returns it along with a cleanup function.

type Subscription

type Subscription interface {
	// Unsubscribe removes the subscription, stopping the delivery of messages.
	Unsubscribe() error
}

Subscription represents an active subscription to a subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL