Documentation
¶
Index ¶
- func NewMessageHandler(logger *logging.Logger, logMQ LogPublisher, tenantStore DestinationGetter, ...) consumer.MessageHandler
- func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, ...) (scheduler.Scheduler, error)
- func WithQueue(queueConfig *mqs.QueueConfig) func(opts *DeliveryMQOption)
- type AlertMonitor
- type AttemptError
- type DeliveryInfra
- type DeliveryMQ
- type DeliveryMQOption
- type DeliveryTracer
- type DestinationGetter
- type LogPublisher
- type PostDeliveryError
- type PreDeliveryError
- type Publisher
- type RetryEventGetter
- type RetryScheduler
- type RetrySchedulerOption
- type RetryTask
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMessageHandler ¶
func NewMessageHandler( logger *logging.Logger, logMQ LogPublisher, tenantStore DestinationGetter, publisher Publisher, eventTracer DeliveryTracer, retryScheduler RetryScheduler, retryBackoff backoff.Backoff, retryMaxLimit int, alertMonitor AlertMonitor, idempotence idempotence.Idempotence, ) consumer.MessageHandler
func NewRetryScheduler ¶
func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, pollBackoff time.Duration, logger *logging.Logger, eventGetter RetryEventGetter, opts ...RetrySchedulerOption) (scheduler.Scheduler, error)
func WithQueue ¶
func WithQueue(queueConfig *mqs.QueueConfig) func(opts *DeliveryMQOption)
Types ¶
type AlertMonitor ¶
type AlertMonitor interface {
HandleAttempt(ctx context.Context, attempt alert.DeliveryAttempt) error
}
type AttemptError ¶ added in v0.13.0
type AttemptError struct {
// contains filtered or unexported fields
}
func (*AttemptError) Error ¶ added in v0.13.0
func (e *AttemptError) Error() string
func (*AttemptError) Unwrap ¶ added in v0.13.0
func (e *AttemptError) Unwrap() error
type DeliveryInfra ¶
type DeliveryMQ ¶
type DeliveryMQ struct {
// contains filtered or unexported fields
}
func New ¶
func New(opts ...func(opts *DeliveryMQOption)) *DeliveryMQ
func (*DeliveryMQ) Publish ¶
func (q *DeliveryMQ) Publish(ctx context.Context, task models.DeliveryTask) error
func (*DeliveryMQ) Subscribe ¶
func (q *DeliveryMQ) Subscribe(ctx context.Context) (mqs.Subscription, error)
type DeliveryMQOption ¶
type DeliveryMQOption struct {
QueueConfig *mqs.QueueConfig
}
type DeliveryTracer ¶
type DeliveryTracer interface {
Deliver(ctx context.Context, task *models.DeliveryTask, destination *models.Destination) (context.Context, trace.Span)
}
type DestinationGetter ¶
type LogPublisher ¶
type PostDeliveryError ¶
type PostDeliveryError struct {
// contains filtered or unexported fields
}
func (*PostDeliveryError) Error ¶
func (e *PostDeliveryError) Error() string
func (*PostDeliveryError) Unwrap ¶
func (e *PostDeliveryError) Unwrap() error
type PreDeliveryError ¶
type PreDeliveryError struct {
// contains filtered or unexported fields
}
Error types to distinguish between different stages of delivery
func (*PreDeliveryError) Error ¶
func (e *PreDeliveryError) Error() string
func (*PreDeliveryError) Unwrap ¶
func (e *PreDeliveryError) Unwrap() error
type RetryEventGetter ¶ added in v0.13.0
type RetryEventGetter interface {
RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error)
}
RetryEventGetter is the interface for fetching events from logstore. This is defined separately from EventGetter in messagehandler.go to avoid circular dependencies.
type RetryScheduler ¶
type RetrySchedulerOption ¶ added in v0.13.0
type RetrySchedulerOption func(*retrySchedulerConfig)
RetrySchedulerOption is a functional option for configuring the retry scheduler.
func WithRetryVisibilityTimeout ¶ added in v0.13.0
func WithRetryVisibilityTimeout(vt uint) RetrySchedulerOption
WithRetryVisibilityTimeout sets the visibility timeout for the retry scheduler queue. This controls how long a message is hidden after being received before it becomes visible again (for retry if the executor returned an error).
type RetryTask ¶ added in v0.13.0
type RetryTask struct {
EventID string
TenantID string
DestinationID string
Attempt int
Telemetry *models.DeliveryTelemetry
}
RetryTask contains the minimal info needed to retry a delivery. The full Event data will be fetched from logstore when the retry executes.
func RetryTaskFromDeliveryTask ¶ added in v0.13.0
func RetryTaskFromDeliveryTask(task models.DeliveryTask) RetryTask
func (*RetryTask) FromString ¶ added in v0.13.0
func (*RetryTask) ToDeliveryTask ¶ added in v0.13.0
func (m *RetryTask) ToDeliveryTask(event models.Event) models.DeliveryTask