Documentation
¶
Index ¶
Constants ¶
const ( DefaultSubscriberParallelism = 1 DefaultTestParallelism = 1 DefaultShutdownTimeout = 30 * time.Second )
const ( // ErrorChannelBufferSize is the buffer size for the error channel // Large enough to handle bursts without blocking ErrorChannelBufferSize = 100 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HandlerFunc ¶
HandlerFunc is a function that handles a CloudEvent
type MetricsRecorder ¶ added in v1.1.0
type MetricsRecorder struct {
// contains filtered or unexported fields
}
MetricsRecorder holds Prometheus metrics for the broker library.
func NewMetricsRecorder ¶ added in v1.1.0
func NewMetricsRecorder(component, version string, registerer prometheus.Registerer) *MetricsRecorder
NewMetricsRecorder creates a new MetricsRecorder and registers all metrics with the provided prometheus.Registerer. If registerer is nil, the default prometheus.DefaultRegisterer is used.
The component and version parameters are used as constant label values on all metrics, per the HyperFleet Metrics Standard.
func (*MetricsRecorder) RecordConsumed ¶ added in v1.1.0
func (m *MetricsRecorder) RecordConsumed(topic string)
RecordConsumed increments the messages consumed counter for the given topic.
func (*MetricsRecorder) RecordDuration ¶ added in v1.1.0
func (m *MetricsRecorder) RecordDuration(topic string, duration time.Duration)
RecordDuration observes the message processing duration for the given topic.
func (*MetricsRecorder) RecordError ¶ added in v1.1.0
func (m *MetricsRecorder) RecordError(topic, errorType string)
RecordError increments the errors counter for the given topic and error type.
func (*MetricsRecorder) RecordPublished ¶ added in v1.1.0
func (m *MetricsRecorder) RecordPublished(topic string)
RecordPublished increments the messages published counter for the given topic.
type Publisher ¶
type Publisher interface {
// Publish publishes a CloudEvent to the specified topic with context
Publish(ctx context.Context, topic string, event *event.Event) error
// Health checks if the underlying broker connection is healthy.
// Returns nil if healthy, or an error describing the failure.
// The provided context controls the deadline/cancellation of the check.
Health(ctx context.Context) error
// Close closes the underlying publisher
Close() error
}
Publisher defines the interface for publishing CloudEvents
func NewPublisher ¶
func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[string]string) (Publisher, error)
NewPublisher creates a new publisher with the provided logger, metrics recorder, and optional configuration. Usage:
- NewPublisher(logger, metrics) - uses provided logger and loads config from file
- NewPublisher(logger, metrics, configMap) - uses provided logger with config map
type Subscriber ¶
type Subscriber interface {
// Subscribe subscribes to a topic and processes messages with the provided handler
Subscribe(ctx context.Context, topic string, handler HandlerFunc) error
// Errors returns a channel that receives errors from background operations.
// The channel is buffered to prevent blocking the subscriber.
// The channel is closed when Close() is called.
// Consumers SHOULD drain this channel to prevent memory leaks.
Errors() <-chan *SubscriberError
// Close closes the underlying subscriber
Close() error
}
Subscriber defines the interface for subscribing to CloudEvents
func NewSubscriber ¶
func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRecorder, configMap ...map[string]string) (Subscriber, error)
NewSubscriber creates a new subscriber with the provided logger, subscription ID, metrics recorder, and optional configuration. Usage:
- NewSubscriber(logger, "id", metrics) - uses provided logger and loads config from file
- NewSubscriber(logger, "id", metrics, configMap) - uses provided logger with config map
type SubscriberError ¶ added in v1.0.0
type SubscriberError struct {
// Op is the operation that failed: "router", "connect", "receive"
Op string
// Topic where the error occurred
Topic string
// SubscriptionID of the subscriber
SubscriptionID string
// Err is the underlying error
Err error
// Timestamp is when the error occurred
Timestamp time.Time
// Fatal indicates if the subscriber has stopped and cannot continue operating.
// If true, the subscriber needs to be recreated.
Fatal bool
}
SubscriberError represents an error that occurred during message processing in background goroutines. These errors are sent via the Errors() channel.
func (*SubscriberError) Error ¶ added in v1.0.0
func (e *SubscriberError) Error() string
Error implements the error interface
func (*SubscriberError) Unwrap ¶ added in v1.0.0
func (e *SubscriberError) Unwrap() error
Unwrap implements the errors.Unwrap interface