broker

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: Apache-2.0 Imports: 25 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultSubscriberParallelism = 1
	DefaultTestParallelism       = 1
	DefaultShutdownTimeout       = 30 * time.Second
)
View Source
const (
	// ErrorChannelBufferSize is the buffer size for the error channel
	// Large enough to handle bursts without blocking
	ErrorChannelBufferSize = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(ctx context.Context, event *event.Event) error

HandlerFunc is a function that handles a CloudEvent

type MetricsRecorder added in v1.1.0

type MetricsRecorder struct {
	// contains filtered or unexported fields
}

MetricsRecorder holds Prometheus metrics for the broker library.

func NewMetricsRecorder added in v1.1.0

func NewMetricsRecorder(component, version string, registerer prometheus.Registerer) *MetricsRecorder

NewMetricsRecorder creates a new MetricsRecorder and registers all metrics with the provided prometheus.Registerer. If registerer is nil, the default prometheus.DefaultRegisterer is used.

The component and version parameters are used as constant label values on all metrics, per the HyperFleet Metrics Standard.

func (*MetricsRecorder) RecordConsumed added in v1.1.0

func (m *MetricsRecorder) RecordConsumed(topic string)

RecordConsumed increments the messages consumed counter for the given topic.

func (*MetricsRecorder) RecordDuration added in v1.1.0

func (m *MetricsRecorder) RecordDuration(topic string, duration time.Duration)

RecordDuration observes the message processing duration for the given topic.

func (*MetricsRecorder) RecordError added in v1.1.0

func (m *MetricsRecorder) RecordError(topic, errorType string)

RecordError increments the errors counter for the given topic and error type.

func (*MetricsRecorder) RecordPublished added in v1.1.0

func (m *MetricsRecorder) RecordPublished(topic string)

RecordPublished increments the messages published counter for the given topic.

type Publisher

type Publisher interface {
	// Publish publishes a CloudEvent to the specified topic with context
	Publish(ctx context.Context, topic string, event *event.Event) error
	// Health checks if the underlying broker connection is healthy.
	// Returns nil if healthy, or an error describing the failure.
	// The provided context controls the deadline/cancellation of the check.
	Health(ctx context.Context) error
	// Close closes the underlying publisher
	Close() error
}

Publisher defines the interface for publishing CloudEvents

func NewPublisher

func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[string]string) (Publisher, error)

NewPublisher creates a new publisher with the provided logger, metrics recorder, and optional configuration. Usage:

  • NewPublisher(logger, metrics) - uses provided logger and loads config from file
  • NewPublisher(logger, metrics, configMap) - uses provided logger with config map

type Subscriber

type Subscriber interface {
	// Subscribe subscribes to a topic and processes messages with the provided handler
	Subscribe(ctx context.Context, topic string, handler HandlerFunc) error
	// Errors returns a channel that receives errors from background operations.
	// The channel is buffered to prevent blocking the subscriber.
	// The channel is closed when Close() is called.
	// Consumers SHOULD drain this channel to prevent memory leaks.
	Errors() <-chan *SubscriberError
	// Close closes the underlying subscriber
	Close() error
}

Subscriber defines the interface for subscribing to CloudEvents

func NewSubscriber

func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRecorder, configMap ...map[string]string) (Subscriber, error)

NewSubscriber creates a new subscriber with the provided logger, subscription ID, metrics recorder, and optional configuration. Usage:

  • NewSubscriber(logger, "id", metrics) - uses provided logger and loads config from file
  • NewSubscriber(logger, "id", metrics, configMap) - uses provided logger with config map

type SubscriberError added in v1.0.0

type SubscriberError struct {
	// Op is the operation that failed: "router", "connect", "receive"
	Op string

	// Topic where the error occurred
	Topic string

	// SubscriptionID of the subscriber
	SubscriptionID string

	// Err is the underlying error
	Err error

	// Timestamp is when the error occurred
	Timestamp time.Time

	// Fatal indicates if the subscriber has stopped and cannot continue operating.
	// If true, the subscriber needs to be recreated.
	Fatal bool
}

SubscriberError represents an error that occurred during message processing in background goroutines. These errors are sent via the Errors() channel.

func (*SubscriberError) Error added in v1.0.0

func (e *SubscriberError) Error() string

Error implements the error interface

func (*SubscriberError) Unwrap added in v1.0.0

func (e *SubscriberError) Unwrap() error

Unwrap implements the errors.Unwrap interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL