Documentation
¶
Overview ¶
Package bus provides event bus functionality for publish/subscribe messaging with CQC protobuf events. It supports multiple backends: in-memory for testing and NATS JetStream for production.
Example usage with in-memory backend:
bus := bus.NewMemory()
defer bus.Close()
// Publish an event
event := &events.AssetCreated{Id: "btc", Name: "Bitcoin"}
err := bus.Publish(ctx, "cqc.events.v1.asset_created", event)
// Subscribe to events
err = bus.Subscribe(ctx, "cqc.events.v1.asset_created",
bus.HandlerFunc(func(ctx context.Context, msg proto.Message) error {
event := msg.(*events.AssetCreated)
return processAssetCreated(ctx, event)
}),
)
Example usage with NATS JetStream:
cfg := config.EventBusConfig{
Backend: "jetstream",
Servers: []string{"nats://localhost:4222"},
StreamName: "CQC_EVENTS",
}
bus, err := bus.NewJetStream(ctx, cfg)
if err != nil {
log.Fatal(err)
}
defer bus.Close()
// Use middleware for automatic retry and logging
err = bus.Subscribe(ctx, "cqc.events.v1.price_updated",
handler,
bus.WithRetry(3, time.Second),
bus.WithLogging(logger),
bus.WithMetrics(),
)
Index ¶
- func IsValidTopic(topic string) bool
- func ParseEventType(topic string) string
- func TopicName(eventType string) string
- type EventBus
- type HandlerFunc
- type JetStreamEventBus
- func (j *JetStreamEventBus) Check(ctx context.Context) error
- func (j *JetStreamEventBus) Close() error
- func (j *JetStreamEventBus) Publish(ctx context.Context, topic string, message proto.Message) error
- func (j *JetStreamEventBus) Subscribe(ctx context.Context, topic string, handler HandlerFunc, ...) error
- type MemoryEventBus
- type Middleware
- type RawMessage
- type SubscribeOption
- func WithErrorHandler(errorHandler func(context.Context, proto.Message, error) error) SubscribeOption
- func WithLogging(logger *logging.Logger) SubscribeOption
- func WithMetrics() SubscribeOption
- func WithRecovery() SubscribeOption
- func WithRetry(maxAttempts int, initialDelay time.Duration) SubscribeOption
- func WithTimeout(timeout time.Duration) SubscribeOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsValidTopic ¶
IsValidTopic checks if a topic name follows the CQC naming convention. Returns true if the topic starts with "cqc.events.v1." and has an event type.
func ParseEventType ¶
ParseEventType extracts the event type from a topic name. Returns the event type if the topic follows the CQC naming convention, or the original topic if it doesn't match the expected format.
Example:
eventType := bus.ParseEventType("cqc.events.v1.asset_created")
// Returns: "asset_created"
func TopicName ¶
TopicName generates a topic name following the CQC naming convention. The event type should be in snake_case (e.g., "asset_created", "position_changed"). Returns a topic name in the format "cqc.events.v1.{event_type}".
Example:
topic := bus.TopicName("asset_created")
// Returns: "cqc.events.v1.asset_created"
Types ¶
type EventBus ¶
type EventBus interface {
// Publish sends a protobuf message to the specified topic.
// The message is automatically serialized to wire format before transmission.
// Returns an error if serialization or publishing fails.
Publish(ctx context.Context, topic string, message proto.Message) error
// Subscribe registers a handler for messages on the specified topic.
// The handler is invoked for each message received on the topic.
// Messages are automatically deserialized from wire format before handler invocation.
// Middleware options can be applied to wrap the handler with retry, logging, metrics, etc.
// Returns an error if subscription fails.
Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error
// Close releases all resources and gracefully shuts down the event bus.
// This flushes any pending messages and closes connections.
Close() error
}
EventBus defines the interface for publishing and subscribing to protobuf events. All methods respect context cancellation and timeout.
type HandlerFunc ¶
HandlerFunc is the function signature for event handlers. Handlers receive the deserialized protobuf message and must return an error if processing fails. Returning a temporary error triggers retry if retry middleware is enabled.
type JetStreamEventBus ¶
type JetStreamEventBus struct {
// contains filtered or unexported fields
}
JetStreamEventBus is a NATS JetStream implementation of EventBus. It provides distributed, persistent messaging with at-least-once delivery guarantees.
func NewJetStream ¶
func NewJetStream(ctx context.Context, cfg config.EventBusConfig) (*JetStreamEventBus, error)
NewJetStream creates a new NATS JetStream event bus. It connects to the NATS servers and creates/updates the configured stream.
Example:
cfg := config.EventBusConfig{
Backend: "jetstream",
Servers: []string{"nats://localhost:4222"},
StreamName: "CQC_EVENTS",
}
bus, err := bus.NewJetStream(ctx, cfg)
if err != nil {
log.Fatal(err)
}
defer bus.Close()
func (*JetStreamEventBus) Check ¶
func (j *JetStreamEventBus) Check(ctx context.Context) error
Check implements the health.Checker interface for the NATS JetStream event bus. It verifies connectivity to the NATS server by checking the connection status.
Example usage:
import "github.com/Combine-Capital/cqi/pkg/health"
h := health.New()
h.RegisterChecker("event_bus", jetStreamBus)
func (*JetStreamEventBus) Close ¶
func (j *JetStreamEventBus) Close() error
Close gracefully shuts down the event bus, stopping all consumers and closing the NATS connection.
func (*JetStreamEventBus) Publish ¶
Publish sends a protobuf message to the specified topic. The message is serialized and published to JetStream with at-least-once delivery.
func (*JetStreamEventBus) Subscribe ¶
func (j *JetStreamEventBus) Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error
Subscribe creates a durable consumer and starts consuming messages from the topic. Messages are automatically deserialized and passed to the handler.
type MemoryEventBus ¶
type MemoryEventBus struct {
// contains filtered or unexported fields
}
MemoryEventBus is an in-memory implementation of EventBus using Go channels. It is designed for testing and development, not for production use. Messages are delivered synchronously within the same process.
func NewMemory ¶
func NewMemory() *MemoryEventBus
NewMemory creates a new in-memory event bus. This is ideal for testing and development where you don't need distributed messaging.
Example:
bus := bus.NewMemory() defer bus.Close()
func (*MemoryEventBus) Check ¶
func (m *MemoryEventBus) Check(ctx context.Context) error
Check implements the health.Checker interface for the in-memory event bus. The in-memory bus is always healthy unless it has been closed.
Example usage:
import "github.com/Combine-Capital/cqi/pkg/health"
h := health.New()
h.RegisterChecker("event_bus", memoryBus)
func (*MemoryEventBus) Close ¶
func (m *MemoryEventBus) Close() error
Close shuts down the event bus and closes all subscriptions.
func (*MemoryEventBus) Publish ¶
Publish sends a message to all subscribers of the given topic. The message is delivered synchronously to all handlers. Returns an error if the bus is closed.
func (*MemoryEventBus) Subscribe ¶
func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler HandlerFunc, options ...SubscribeOption) error
Subscribe registers a handler for the given topic. Messages published to this topic will be delivered to the handler. The handler runs in a separate goroutine.
type Middleware ¶
type Middleware func(HandlerFunc) HandlerFunc
Middleware wraps a HandlerFunc to add cross-cutting concerns like retry, logging, and metrics.
type RawMessage ¶
type RawMessage struct {
Data []byte
}
RawMessage is a wrapper for raw protobuf bytes when the type is unknown. Handlers should type assert to their expected message type after unmarshaling.
func (*RawMessage) ProtoMessage ¶
func (r *RawMessage) ProtoMessage()
ProtoMessage implements proto.Message.
func (*RawMessage) ProtoReflect ¶
func (r *RawMessage) ProtoReflect() protoreflect.Message
ProtoReflect implements protoreflect.Message.
type SubscribeOption ¶
type SubscribeOption func(*subscribeOptions)
SubscribeOption is a function that modifies subscription behavior. Options can add middleware like retry, logging, and metrics to the handler.
func WithErrorHandler ¶
func WithErrorHandler(errorHandler func(context.Context, proto.Message, error) error) SubscribeOption
WithErrorHandler wraps a handler with custom error handling. The errorHandler is invoked when the wrapped handler returns an error. If errorHandler returns nil, the error is considered handled and won't propagate.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithErrorHandler(func(ctx context.Context, msg proto.Message, err error) error {
if errors.IsNotFound(err) {
// Ignore not found errors
return nil
}
return err
}))
func WithLogging ¶
func WithLogging(logger *logging.Logger) SubscribeOption
WithLogging wraps a handler with logging for message processing. It logs the start and end of message processing, including any errors.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithLogging(logger))
func WithMetrics ¶
func WithMetrics() SubscribeOption
WithMetrics wraps a handler with metrics collection. It records the duration and outcome (success/failure) of message processing.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithMetrics())
func WithRecovery ¶
func WithRecovery() SubscribeOption
WithRecovery wraps a handler with panic recovery. If the handler panics, the panic is caught and converted to an error.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithRecovery())
func WithRetry ¶
func WithRetry(maxAttempts int, initialDelay time.Duration) SubscribeOption
WithRetry wraps a handler with retry logic for temporary errors. It will retry the handler up to maxAttempts times with exponential backoff.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithRetry(3, time.Second))
func WithTimeout ¶
func WithTimeout(timeout time.Duration) SubscribeOption
WithTimeout wraps a handler with a timeout. If the handler doesn't complete within the specified duration, it returns a timeout error.
Example:
bus.Subscribe(ctx, topic, handler, bus.WithTimeout(30*time.Second))