libbus

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package bus provides an interface for core publish-subscribe messaging.

It is designed to be a simple, high-level abstraction over a message broker, offering common patterns like fire-and-forget publishing, streaming subscriptions, and request-reply.

Basic Usage:

// Configuration (replace with your actual values)
cfg := &bus.Config{
	NATSURL: "nats://127.0.0.1:4222",
}

// Create a new Messenger instance
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messenger, err := bus.NewPubSub(ctx, cfg)
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}
defer messenger.Close()

// --- Publish a message ---
err = messenger.Publish(context.Background(), "updates.topic", []byte("hello world"))
if err != nil {
	log.Printf("Publish failed: %v", err)
}

// --- Stream messages ---
msgChan := make(chan []byte, 64)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()

sub, err := messenger.Stream(streamCtx, "updates.topic", msgChan)
if err != nil {
	log.Fatalf("Stream failed: %v", err)
}
defer sub.Unsubscribe()

// --- Serve requests ---
handler := func(ctx context.Context, data []byte) ([]byte, error) {
	log.Printf("Handler received: %s", string(data))
	return []byte("ack"), nil
}
serveCtx, serveCancel := context.WithCancel(context.Background())
defer serveCancel()
serveSub, err := messenger.Serve(serveCtx, "service.topic", handler)
if err != nil {
	log.Fatalf("Serve failed: %v", err)
}
defer serveSub.Unsubscribe()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionClosed is returned when an operation is attempted on a closed connection.
	ErrConnectionClosed = errors.New("connection closed")
	// ErrStreamSubscriptionFail is returned when a stream subscription fails.
	ErrStreamSubscriptionFail = errors.New("stream subscription failed")
	// ErrMessagePublish is returned when publishing a message fails for reasons other than a closed connection.
	ErrMessagePublish = errors.New("message publishing failed")
	// ErrRequestTimeout is returned when a request-reply operation times out.
	ErrRequestTimeout = errors.New("request timed out")
)

Functions

func SetupNatsInstance

func SetupNatsInstance(ctx context.Context) (string, testcontainers.Container, func(), error)

Types

type Config

type Config struct {
	NATSURL      string
	NATSPassword string
	NATSUser     string
}

type Handler

type Handler func(ctx context.Context, data []byte) ([]byte, error)

Handler is a function that processes a request and returns a response. It is used by the Serve method to handle incoming requests.

type InMem

type InMem struct {
	// contains filtered or unexported fields
}

InMem is an in-memory implementation of Messenger for single-process use. It does not use NATS or any network. Publish delivers to local Stream subscribers; Request/Serve work as same-process request-reply.

func NewInMem

func NewInMem() *InMem

NewInMem returns a new in-memory Messenger. Use for local single-process mode (no NATS).

func (*InMem) Close

func (p *InMem) Close() error

Close marks the messenger closed and releases resources.

func (*InMem) Publish

func (p *InMem) Publish(ctx context.Context, subject string, data []byte) error

Publish sends a fire-and-forget message to all Stream subscribers for the subject.

func (*InMem) Request

func (p *InMem) Request(ctx context.Context, subject string, data []byte) ([]byte, error)

Request sends a request and waits for a reply from a Serve handler on the subject.

func (*InMem) Serve

func (p *InMem) Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)

Serve registers a handler for the subject. Request calls will invoke this handler.

func (*InMem) Stream

func (p *InMem) Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)

Stream creates a subscription to a subject; messages are delivered to ch.

type Messenger

type Messenger interface {
	// Publish sends a fire-and-forget message to a given subject.
	Publish(ctx context.Context, subject string, data []byte) error

	// Stream creates a subscription to a subject and delivers messages asynchronously
	// to the provided channel. The subscription is automatically managed and will
	// be closed when the provided context is canceled.
	Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)

	// Request sends a request message and waits for a reply. The context can be
	// used to set a timeout or to cancel the request.
	Request(ctx context.Context, subject string, data []byte) ([]byte, error)

	// Serve registers a handler for a given subject to respond to requests.
	// It starts a worker that listens for requests and executes the handler.
	// The returned Subscription can be used to stop serving.
	Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)

	// Close disconnects from the messaging server and cleans up any underlying resources.
	Close() error
}

Messenger defines a high-level interface for various messaging patterns. It is designed for real-time event notifications, triggering ephemeral tasks, and distributing lightweight messages between services.

func NewPubSub

func NewPubSub(ctx context.Context, cfg *Config) (Messenger, error)

func NewTestPubSub

func NewTestPubSub() (Messenger, func(), error)

NewTestPubSub starts a NATS container using SetupNatsInstance, creates a new PubSub instance, and returns it along with a cleanup function.

type SQLiteBus added in v0.4.0

type SQLiteBus struct {
	// contains filtered or unexported fields
}

SQLiteBus implements Messenger over a SQLite database.

Schema tables (bus_events, bus_requests, bus_replies) must exist before use. They are part of runtimetypes.SchemaSQLite and are created automatically when the CLI database is opened.

Usage:

bus := libbus.NewSQLite(dbManager.WithoutTransaction())
defer bus.Close()

func NewSQLite added in v0.4.0

func NewSQLite(exec sqlExec) *SQLiteBus

NewSQLite creates a SQLite-backed Messenger. exec must be the result of dbManager.WithoutTransaction() — it satisfies sqlExec.

func (*SQLiteBus) Close added in v0.4.0

func (b *SQLiteBus) Close() error

Close stops all background goroutines. The underlying database is NOT closed (it is owned by the caller who provided the sqlExec).

func (*SQLiteBus) Publish added in v0.4.0

func (b *SQLiteBus) Publish(ctx context.Context, subject string, data []byte) error

Publish inserts a row into bus_events so Stream subscribers can pick it up.

func (*SQLiteBus) Request added in v0.4.0

func (b *SQLiteBus) Request(ctx context.Context, subject string, data []byte) ([]byte, error)

Request inserts a request row and polls for the reply until ctx deadline or 10s timeout.

func (*SQLiteBus) Serve added in v0.4.0

func (b *SQLiteBus) Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)

Serve registers a handler for subject. A polling goroutine picks up rows from bus_requests, calls the handler, and writes the reply to bus_replies.

func (*SQLiteBus) Stream added in v0.4.0

func (b *SQLiteBus) Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)

Stream starts a polling goroutine that delivers new bus_events for subject to ch. The subscription goroutine stops when ctx is cancelled.

type Subscription

type Subscription interface {
	// Unsubscribe removes the subscription, stopping the delivery of messages.
	Unsubscribe() error
}

Subscription represents an active subscription to a subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL