bus

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package bus provides event bus functionality for publish/subscribe messaging with CQC protobuf events. It supports multiple backends: in-memory for testing and NATS JetStream for production.

Example usage with in-memory backend:

bus := bus.NewMemory()
defer bus.Close()

// Publish an event
event := &events.AssetCreated{Id: "btc", Name: "Bitcoin"}
err := bus.Publish(ctx, "cqc.events.v1.asset_created", event)

// Subscribe to events
err = bus.Subscribe(ctx, "cqc.events.v1.asset_created",
    bus.HandlerFunc(func(ctx context.Context, msg proto.Message) error {
        event := msg.(*events.AssetCreated)
        return processAssetCreated(ctx, event)
    }),
)

Example usage with NATS JetStream:

cfg := config.EventBusConfig{
    Backend:    "jetstream",
    Servers:    []string{"nats://localhost:4222"},
    StreamName: "CQC_EVENTS",
}

bus, err := bus.NewJetStream(ctx, cfg)
if err != nil {
    log.Fatal(err)
}
defer bus.Close()

// Use middleware for automatic retry and logging
err = bus.Subscribe(ctx, "cqc.events.v1.price_updated",
    handler,
    bus.WithRetry(3, time.Second),
    bus.WithLogging(logger),
    bus.WithMetrics(),
)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsValidTopic

func IsValidTopic(topic string) bool

IsValidTopic checks if a topic name follows the CQC naming convention. Returns true if the topic starts with "cqc.events.v1." and has an event type.

func ParseEventType

func ParseEventType(topic string) string

ParseEventType extracts the event type from a topic name. Returns the event type if the topic follows the CQC naming convention, or the original topic if it doesn't match the expected format.

Example:

eventType := bus.ParseEventType("cqc.events.v1.asset_created")
// Returns: "asset_created"

func TopicName

func TopicName(eventType string) string

TopicName generates a topic name following the CQC naming convention. The event type should be in snake_case (e.g., "asset_created", "position_changed"). Returns a topic name in the format "cqc.events.v1.{event_type}".

Example:

topic := bus.TopicName("asset_created")
// Returns: "cqc.events.v1.asset_created"

Types

type EventBus

type EventBus interface {
	// Publish sends a protobuf message to the specified topic.
	// The message is automatically serialized to wire format before transmission.
	// Returns an error if serialization or publishing fails.
	Publish(ctx context.Context, topic string, message proto.Message) error

	// Subscribe registers a handler for messages on the specified topic.
	// The handler is invoked for each message received on the topic.
	// Messages are automatically deserialized from wire format before handler invocation.
	// Middleware options can be applied to wrap the handler with retry, logging, metrics, etc.
	// Returns an error if subscription fails.
	Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error

	// Close releases all resources and gracefully shuts down the event bus.
	// This flushes any pending messages and closes connections.
	Close() error
}

EventBus defines the interface for publishing and subscribing to protobuf events. All methods respect context cancellation and timeout.

type HandlerFunc

type HandlerFunc func(ctx context.Context, message proto.Message) error

HandlerFunc is the function signature for event handlers. Handlers receive the deserialized protobuf message and must return an error if processing fails. Returning a temporary error triggers retry if retry middleware is enabled.

type JetStreamEventBus

type JetStreamEventBus struct {
	// contains filtered or unexported fields
}

JetStreamEventBus is a NATS JetStream implementation of EventBus. It provides distributed, persistent messaging with at-least-once delivery guarantees.

func NewJetStream

func NewJetStream(ctx context.Context, cfg config.EventBusConfig) (*JetStreamEventBus, error)

NewJetStream creates a new NATS JetStream event bus. It connects to the NATS servers and creates/updates the configured stream.

Example:

cfg := config.EventBusConfig{
    Backend:    "jetstream",
    Servers:    []string{"nats://localhost:4222"},
    StreamName: "CQC_EVENTS",
}

bus, err := bus.NewJetStream(ctx, cfg)
if err != nil {
    log.Fatal(err)
}
defer bus.Close()

func (*JetStreamEventBus) Check

func (j *JetStreamEventBus) Check(ctx context.Context) error

Check implements the health.Checker interface for the NATS JetStream event bus. It verifies connectivity to the NATS server by checking the connection status.

Example usage:

import "github.com/Combine-Capital/cqi/pkg/health"

h := health.New()
h.RegisterChecker("event_bus", jetStreamBus)

func (*JetStreamEventBus) Close

func (j *JetStreamEventBus) Close() error

Close gracefully shuts down the event bus, stopping all consumers and closing the NATS connection.

func (*JetStreamEventBus) Publish

func (j *JetStreamEventBus) Publish(ctx context.Context, topic string, message proto.Message) error

Publish sends a protobuf message to the specified topic. The message is serialized and published to JetStream with at-least-once delivery.

func (*JetStreamEventBus) Subscribe

func (j *JetStreamEventBus) Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error

Subscribe creates a durable consumer and starts consuming messages from the topic. Messages are automatically deserialized and passed to the handler.

type MemoryEventBus

type MemoryEventBus struct {
	// contains filtered or unexported fields
}

MemoryEventBus is an in-memory implementation of EventBus using Go channels. It is designed for testing and development, not for production use. Messages are delivered synchronously within the same process.

func NewMemory

func NewMemory() *MemoryEventBus

NewMemory creates a new in-memory event bus. This is ideal for testing and development where you don't need distributed messaging.

Example:

bus := bus.NewMemory()
defer bus.Close()

func (*MemoryEventBus) Check

func (m *MemoryEventBus) Check(ctx context.Context) error

Check implements the health.Checker interface for the in-memory event bus. The in-memory bus is always healthy unless it has been closed.

Example usage:

import "github.com/Combine-Capital/cqi/pkg/health"

h := health.New()
h.RegisterChecker("event_bus", memoryBus)

func (*MemoryEventBus) Close

func (m *MemoryEventBus) Close() error

Close shuts down the event bus and closes all subscriptions.

func (*MemoryEventBus) Publish

func (m *MemoryEventBus) Publish(ctx context.Context, topic string, message proto.Message) error

Publish sends a message to all subscribers of the given topic. The message is delivered synchronously to all handlers. Returns an error if the bus is closed.

func (*MemoryEventBus) Subscribe

func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error

Subscribe registers a handler for the given topic. Messages published to this topic will be delivered to the handler. The handler runs in a separate goroutine.

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

Middleware wraps a HandlerFunc to add cross-cutting concerns like retry, logging, and metrics.

type RawMessage

type RawMessage struct {
	Data []byte
}

RawMessage is a wrapper for raw protobuf bytes when the type is unknown. Handlers should type assert to their expected message type after unmarshaling.

func (*RawMessage) ProtoMessage

func (r *RawMessage) ProtoMessage()

ProtoMessage implements proto.Message.

func (*RawMessage) ProtoReflect

func (r *RawMessage) ProtoReflect() protoreflect.Message

ProtoReflect implements protoreflect.Message.

func (*RawMessage) Reset

func (r *RawMessage) Reset()

Reset implements proto.Message.

func (*RawMessage) String

func (r *RawMessage) String() string

String implements proto.Message.

type SubscribeOption

type SubscribeOption func(*subscribeOptions)

SubscribeOption is a function that modifies subscription behavior. Options can add middleware like retry, logging, and metrics to the handler.

func WithErrorHandler

func WithErrorHandler(errorHandler func(context.Context, proto.Message, error) error) SubscribeOption

WithErrorHandler wraps a handler with custom error handling. The errorHandler is invoked when the wrapped handler returns an error. If errorHandler returns nil, the error is considered handled and won't propagate.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithErrorHandler(func(ctx context.Context, msg proto.Message, err error) error {
    if errors.IsNotFound(err) {
        // Ignore not found errors
        return nil
    }
    return err
}))

func WithLogging

func WithLogging(logger *logging.Logger) SubscribeOption

WithLogging wraps a handler with logging for message processing. It logs the start and end of message processing, including any errors.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithLogging(logger))

func WithMetrics

func WithMetrics() SubscribeOption

WithMetrics wraps a handler with metrics collection. It records the duration and outcome (success/failure) of message processing.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithMetrics())

func WithRecovery

func WithRecovery() SubscribeOption

WithRecovery wraps a handler with panic recovery. If the handler panics, the panic is caught and converted to an error.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithRecovery())

func WithRetry

func WithRetry(maxAttempts int, initialDelay time.Duration) SubscribeOption

WithRetry wraps a handler with retry logic for temporary errors. It will retry the handler up to maxAttempts times with exponential backoff.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithRetry(3, time.Second))

func WithTimeout

func WithTimeout(timeout time.Duration) SubscribeOption

WithTimeout wraps a handler with a timeout. If the handler doesn't complete within the specified duration, it returns a timeout error.

Example:

bus.Subscribe(ctx, topic, handler, bus.WithTimeout(30*time.Second))

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL