Documentation
¶
Overview ¶
Package bus provides an interface for core publish-subscribe messaging.
It is designed to be a simple, high-level abstraction over a message broker, offering common patterns like fire-and-forget publishing, streaming subscriptions, and request-reply.
Basic Usage:
// Configuration (replace with your actual values)
cfg := &bus.Config{
NATSURL: "nats://127.0.0.1:4222",
}
// Create a new Messenger instance
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messenger, err := bus.NewPubSub(ctx, cfg)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer messenger.Close()
// --- Publish a message ---
err = messenger.Publish(context.Background(), "updates.topic", []byte("hello world"))
if err != nil {
log.Printf("Publish failed: %v", err)
}
// --- Stream messages ---
msgChan := make(chan []byte, 64)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()
sub, err := messenger.Stream(streamCtx, "updates.topic", msgChan)
if err != nil {
log.Fatalf("Stream failed: %v", err)
}
defer sub.Unsubscribe()
// --- Serve requests ---
handler := func(ctx context.Context, data []byte) ([]byte, error) {
log.Printf("Handler received: %s", string(data))
return []byte("ack"), nil
}
serveCtx, serveCancel := context.WithCancel(context.Background())
defer serveCancel()
serveSub, err := messenger.Serve(serveCtx, "service.topic", handler)
if err != nil {
log.Fatalf("Serve failed: %v", err)
}
defer serveSub.Unsubscribe()
Index ¶
- Variables
- func SetupNatsInstance(ctx context.Context) (string, testcontainers.Container, func(), error)
- type Config
- type Handler
- type InMem
- func (p *InMem) Close() error
- func (p *InMem) Publish(ctx context.Context, subject string, data []byte) error
- func (p *InMem) Request(ctx context.Context, subject string, data []byte) ([]byte, error)
- func (p *InMem) Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)
- func (p *InMem) Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)
- type Messenger
- type SQLiteBus
- func (b *SQLiteBus) Close() error
- func (b *SQLiteBus) Publish(ctx context.Context, subject string, data []byte) error
- func (b *SQLiteBus) Request(ctx context.Context, subject string, data []byte) ([]byte, error)
- func (b *SQLiteBus) Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)
- func (b *SQLiteBus) Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)
- type Subscription
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectionClosed is returned when an operation is attempted on a closed connection. ErrConnectionClosed = errors.New("connection closed") // ErrStreamSubscriptionFail is returned when a stream subscription fails. ErrStreamSubscriptionFail = errors.New("stream subscription failed") // ErrMessagePublish is returned when publishing a message fails for reasons other than a closed connection. ErrMessagePublish = errors.New("message publishing failed") // ErrRequestTimeout is returned when a request-reply operation times out. ErrRequestTimeout = errors.New("request timed out") )
Functions ¶
func SetupNatsInstance ¶
Types ¶
type Handler ¶
Handler is a function that processes a request and returns a response. It is used by the Serve method to handle incoming requests.
type InMem ¶
type InMem struct {
// contains filtered or unexported fields
}
InMem is an in-memory implementation of Messenger for single-process use. It does not use NATS or any network. Publish delivers to local Stream subscribers; Request/Serve work as same-process request-reply.
func NewInMem ¶
func NewInMem() *InMem
NewInMem returns a new in-memory Messenger. Use for local single-process mode (no NATS).
func (*InMem) Publish ¶
Publish sends a fire-and-forget message to all Stream subscribers for the subject.
func (*InMem) Request ¶
Request sends a request and waits for a reply from a Serve handler on the subject.
type Messenger ¶
type Messenger interface {
// Publish sends a fire-and-forget message to a given subject.
Publish(ctx context.Context, subject string, data []byte) error
// Stream creates a subscription to a subject and delivers messages asynchronously
// to the provided channel. The subscription is automatically managed and will
// be closed when the provided context is canceled.
Stream(ctx context.Context, subject string, ch chan<- []byte) (Subscription, error)
// Request sends a request message and waits for a reply. The context can be
// used to set a timeout or to cancel the request.
Request(ctx context.Context, subject string, data []byte) ([]byte, error)
// Serve registers a handler for a given subject to respond to requests.
// It starts a worker that listens for requests and executes the handler.
// The returned Subscription can be used to stop serving.
Serve(ctx context.Context, subject string, handler Handler) (Subscription, error)
// Close disconnects from the messaging server and cleans up any underlying resources.
Close() error
}
Messenger defines a high-level interface for various messaging patterns. It is designed for real-time event notifications, triggering ephemeral tasks, and distributing lightweight messages between services.
func NewTestPubSub ¶
NewTestPubSub starts a NATS container using SetupNatsInstance, creates a new PubSub instance, and returns it along with a cleanup function.
type SQLiteBus ¶ added in v0.4.0
type SQLiteBus struct {
// contains filtered or unexported fields
}
SQLiteBus implements Messenger over a SQLite database.
Schema tables (bus_events, bus_requests, bus_replies) must exist before use. They are part of runtimetypes.SchemaSQLite and are created automatically when the CLI database is opened.
Usage:
bus := libbus.NewSQLite(dbManager.WithoutTransaction()) defer bus.Close()
func NewSQLite ¶ added in v0.4.0
func NewSQLite(exec sqlExec) *SQLiteBus
NewSQLite creates a SQLite-backed Messenger. exec must be the result of dbManager.WithoutTransaction() — it satisfies sqlExec.
func (*SQLiteBus) Close ¶ added in v0.4.0
Close stops all background goroutines. The underlying database is NOT closed (it is owned by the caller who provided the sqlExec).
func (*SQLiteBus) Publish ¶ added in v0.4.0
Publish inserts a row into bus_events so Stream subscribers can pick it up.
func (*SQLiteBus) Request ¶ added in v0.4.0
Request inserts a request row and polls for the reply until ctx deadline or 10s timeout.
type Subscription ¶
type Subscription interface {
// Unsubscribe removes the subscription, stopping the delivery of messages.
Unsubscribe() error
}
Subscription represents an active subscription to a subject.