libbus

package module
v0.0.0-...-35541fa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package libbus provides an interface for core publish-subscribe messaging.

Basic Usage:

// Configuration (replace with your actual values)
cfg := &libbus.Config{
	NATSURL: nats.DefaultURL, // "nats://127.0.0.1:4222"
	// NATSUser: "user",
	// NATSPassword: "password",
}

// Create a new Messenger instance
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messenger, err := libbus.NewPubSub(ctx, cfg)
if err != nil {
	log.Fatalf("Failed to connect to NATS: %v", err)
}
defer messenger.Close() // Ensure connection is closed

// --- Publish ---
go func() {
	time.Sleep(100 * time.Millisecond)
	log.Println("Publishing message...")
	err := messenger.Publish(context.Background(), "updates.topic", []byte("hello world"))
	if err != nil {
		log.Printf("Publish failed: %v", err)
	}
}()

// --- Stream ---
msgChan := make(chan []byte, 64)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()

sub, err := messenger.Stream(streamCtx, "updates.topic", msgChan)
if err != nil {
	log.Fatalf("Stream failed: %v", err)
}
// defer sub.Unsubscribe() // Unsubscribe is handled internally when context is cancelled

log.Println("Listening for messages...")
select {
case msgData := <-msgChan:
	log.Printf("Received message: %s", string(msgData))
	// Typically you'd loop here or handle messages in a goroutine
case <-time.After(1 * time.Second): // Timeout example
	log.Println("Timeout waiting for message")
}

// To stop the stream, cancel its context
// streamCancel()

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConnectionClosed       = errors.New("connection closed")
	ErrStreamSubscriptionFail = errors.New("stream subscription failed")
	ErrMessagePublish         = errors.New("message publishing failed")
)

Functions

func SetupNatsInstance

func SetupNatsInstance(ctx context.Context) (string, testcontainers.Container, func(), error)

Types

type Config

type Config struct {
	NATSURL      string
	NATSPassword string
	NATSUser     string
}

type Messenger

type Messenger interface {
	// Publish sends a message on the given subject.
	Publish(ctx context.Context, subject string, data []byte) error

	// Stream streams messages (using channels) from the given subject.
	Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)

	// Close cleans up any underlying resources.
	Close() error
}

Real-time event notifications (e.g., job state updates to a UI) Triggering ephemeral tasks (e.g., quick, non-persistent jobs) Distributing lightweight messages between services

func NewPubSub

func NewPubSub(ctx context.Context, cfg *Config) (Messenger, error)

func NewTestPubSub

func NewTestPubSub(t *testing.T) (Messenger, func())

NewTestPubSub starts a NATS container using SetupNatsInstance, creates a new PubSub instance, and returns it along with a cleanup function.

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL