deliverymq

package
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMessageHandler

func NewMessageHandler(
	logger *logging.Logger,
	logMQ LogPublisher,
	tenantStore DestinationGetter,
	publisher Publisher,
	eventTracer DeliveryTracer,
	retryScheduler RetryScheduler,
	retryBackoff backoff.Backoff,
	retryMaxLimit int,
	alertMonitor AlertMonitor,
	idempotence idempotence.Idempotence,
) consumer.MessageHandler

func NewRetryScheduler

func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, pollBackoff time.Duration, logger *logging.Logger, eventGetter RetryEventGetter, opts ...RetrySchedulerOption) (scheduler.Scheduler, error)

func WithQueue

func WithQueue(queueConfig *mqs.QueueConfig) func(opts *DeliveryMQOption)

Types

type AlertMonitor

type AlertMonitor interface {
	HandleAttempt(ctx context.Context, attempt alert.DeliveryAttempt) error
}

type AttemptError added in v0.13.0

type AttemptError struct {
	// contains filtered or unexported fields
}

func (*AttemptError) Error added in v0.13.0

func (e *AttemptError) Error() string

func (*AttemptError) Unwrap added in v0.13.0

func (e *AttemptError) Unwrap() error

type DeliveryInfra

type DeliveryInfra interface {
	DeclareInfrastructure(ctx context.Context) error
}

type DeliveryMQ

type DeliveryMQ struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...func(opts *DeliveryMQOption)) *DeliveryMQ

func (*DeliveryMQ) Init

func (q *DeliveryMQ) Init(ctx context.Context) (func(), error)

func (*DeliveryMQ) Publish

func (q *DeliveryMQ) Publish(ctx context.Context, task models.DeliveryTask) error

func (*DeliveryMQ) Subscribe

func (q *DeliveryMQ) Subscribe(ctx context.Context) (mqs.Subscription, error)

type DeliveryMQOption

type DeliveryMQOption struct {
	QueueConfig *mqs.QueueConfig
}

type DeliveryTracer

type DeliveryTracer interface {
	Deliver(ctx context.Context, task *models.DeliveryTask, destination *models.Destination) (context.Context, trace.Span)
}

type DestinationGetter

type DestinationGetter interface {
	RetrieveDestination(ctx context.Context, tenantID, destID string) (*models.Destination, error)
}

type LogPublisher

type LogPublisher interface {
	Publish(ctx context.Context, entry models.LogEntry) error
}

type PostDeliveryError

type PostDeliveryError struct {
	// contains filtered or unexported fields
}

func (*PostDeliveryError) Error

func (e *PostDeliveryError) Error() string

func (*PostDeliveryError) Unwrap

func (e *PostDeliveryError) Unwrap() error

type PreDeliveryError

type PreDeliveryError struct {
	// contains filtered or unexported fields
}

Error types to distinguish between different stages of delivery

func (*PreDeliveryError) Error

func (e *PreDeliveryError) Error() string

func (*PreDeliveryError) Unwrap

func (e *PreDeliveryError) Unwrap() error

type Publisher

type Publisher interface {
	PublishEvent(ctx context.Context, destination *models.Destination, event *models.Event) (*models.Attempt, error)
}

type RetryEventGetter added in v0.13.0

type RetryEventGetter interface {
	RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error)
}

RetryEventGetter is the interface for fetching events from logstore. This is defined separately from EventGetter in messagehandler.go to avoid circular dependencies.

type RetryScheduler

type RetryScheduler interface {
	Schedule(ctx context.Context, task string, delay time.Duration, opts ...scheduler.ScheduleOption) error
	Cancel(ctx context.Context, taskID string) error
}

type RetrySchedulerOption added in v0.13.0

type RetrySchedulerOption func(*retrySchedulerConfig)

RetrySchedulerOption is a functional option for configuring the retry scheduler.

func WithRetryVisibilityTimeout added in v0.13.0

func WithRetryVisibilityTimeout(vt uint) RetrySchedulerOption

WithRetryVisibilityTimeout sets the visibility timeout for the retry scheduler queue. This controls how long a message is hidden after being received before it becomes visible again (for retry if the executor returned an error).

type RetryTask added in v0.13.0

type RetryTask struct {
	EventID       string
	TenantID      string
	DestinationID string
	Attempt       int
	Telemetry     *models.DeliveryTelemetry
}

RetryTask contains the minimal info needed to retry a delivery. The full Event data will be fetched from logstore when the retry executes.

func RetryTaskFromDeliveryTask added in v0.13.0

func RetryTaskFromDeliveryTask(task models.DeliveryTask) RetryTask

func (*RetryTask) FromString added in v0.13.0

func (m *RetryTask) FromString(str string) error

func (*RetryTask) ToDeliveryTask added in v0.13.0

func (m *RetryTask) ToDeliveryTask(event models.Event) models.DeliveryTask

func (*RetryTask) ToString added in v0.13.0

func (m *RetryTask) ToString() (string, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL