Versions in this module Expand all Collapse all v0 v0.1.0 Nov 24, 2025 Changes in this version + const ErrCodeConfiguration + const ErrCodeDatabase + const ErrCodeDelivery + const ErrCodeNoData + const ErrCodeValidation + var ErrInvalidConfiguration = &Error + var ErrNoData = &Error + var MigrationFiles embed.FS + func IsNoData(err error) bool + type DLQRepository interface + CountUnresolved func(ctx context.Context) (int, error) + Delete func(ctx context.Context, m model.DeadLetterQueue) error + FindByMessageID func(ctx context.Context, messageID int64) (model.DeadLetterQueue, error) + FindBySubscription func(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error) + FindOlderThan func(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error) + FindUnresolved func(ctx context.Context, limit int) ([]model.DeadLetterQueue, error) + GetStats func(ctx context.Context) (model.DLQStats, error) + Load func(ctx context.Context, id int64) (model.DeadLetterQueue, error) + Save func(ctx context.Context, m model.DeadLetterQueue) (model.DeadLetterQueue, error) + type Error struct + Code string + Err error + Message string + func NewError(code, message string) *Error + func NewErrorWithCause(code, message string, cause error) *Error + func (e *Error) Error() string + func (e *Error) Unwrap() error + type Filter struct + CbuID int + IsActive bool + SubscriberID int + TopicID string + type Logger interface + Debugf func(format string, args ...interface{}) + Errorf func(format string, args ...interface{}) + Info func(message string) + Infof func(format string, args ...interface{}) + Warnf func(format string, args ...interface{}) + type LoggingNotificationService struct + func NewLoggingNotificationService(logger Logger) *LoggingNotificationService + func (n *LoggingNotificationService) NotifyDLQItemAdded(_ context.Context, dlq model.DeadLetterQueue) error + func (n *LoggingNotificationService) NotifyDeliveryFailure(_ context.Context, queue *model.Queue, err error) error + func (n *LoggingNotificationService) NotifySubscriptionCreated(_ context.Context, subscription model.Subscription) error + func (n *LoggingNotificationService) NotifySubscriptionDeactivated(_ context.Context, subscription model.Subscription) error + type MessageDeliveryGateway interface + DeliverMessage func(ctx context.Context, callbackURL string, message *model.DataMessage) error + type MessageRepository interface + Delete func(ctx context.Context, m model.Message) error + FindOutdatedMessages func(ctx context.Context, days int) ([]model.Message, error) + Load func(ctx context.Context, id int64) (model.Message, error) + Save func(ctx context.Context, m model.Message) (model.Message, error) + type NoOpNotificationService struct + func (n *NoOpNotificationService) NotifyDLQItemAdded(_ context.Context, _ model.DeadLetterQueue) error + func (n *NoOpNotificationService) NotifyDeliveryFailure(_ context.Context, _ *model.Queue, _ error) error + func (n *NoOpNotificationService) NotifySubscriptionCreated(_ context.Context, _ model.Subscription) error + func (n *NoOpNotificationService) NotifySubscriptionDeactivated(_ context.Context, _ model.Subscription) error + type NoopLogger struct + func (l *NoopLogger) Debugf(_ string, _ ...interface{}) + func (l *NoopLogger) Errorf(_ string, _ ...interface{}) + func (l *NoopLogger) Info(_ string) + func (l *NoopLogger) Infof(_ string, _ ...interface{}) + func (l *NoopLogger) Warnf(_ string, _ ...interface{}) + type NotificationService interface + NotifyDLQItemAdded func(ctx context.Context, dlq model.DeadLetterQueue) error + NotifyDeliveryFailure func(ctx context.Context, queue *model.Queue, err error) error + NotifySubscriptionCreated func(ctx context.Context, subscription model.Subscription) error + NotifySubscriptionDeactivated func(ctx context.Context, subscription model.Subscription) error + type Option func(*QueueWorker) error + func WithBatchSize(size int) Option + func WithDelivery(transmitterProvider TransmitterProvider, gateway MessageDeliveryGateway) Option + func WithLogger(logger Logger) Option + func WithNotifications(service NotificationService) Option + func WithRepositories(queueRepo QueueRepository, messageRepo MessageRepository, ...) Option + func WithRetryStrategy(strategy retry.Strategy) Option + type PublishRequest struct + Data string + Identifier string + TopicCode string + type PublishResult struct + MessageID int64 + QueueItemsCreated int + SubscriptionsIDs []int64 + type Publisher struct + func NewPublisher(opts ...PublisherOption) (*Publisher, error) + func (p *Publisher) Publish(ctx context.Context, req PublishRequest) (*PublishResult, error) + func (p *Publisher) PublishBatch(ctx context.Context, requests []PublishRequest) ([]*PublishResult, error) + type PublisherOption func(*Publisher) error + func WithPublisherLogger(logger Logger) PublisherOption + func WithPublisherRepositories(messageRepo MessageRepository, queueRepo QueueRepository, ...) PublisherOption + type PublisherRepository interface + GetByPublisherCode func(ctx context.Context, publisherCode string) (model.Publisher, error) + Load func(ctx context.Context, id int64) (model.Publisher, error) + Save func(ctx context.Context, m model.Publisher) (model.Publisher, error) + type QueueRepository interface + Delete func(ctx context.Context, m *model.Queue) error + FindByMessageID func(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error) + FindBySubscriptionID func(ctx context.Context, subscriptionID int64) ([]model.Queue, error) + FindExpiredItems func(ctx context.Context, limit int) ([]model.Queue, error) + FindPendingItems func(ctx context.Context, limit int) ([]model.Queue, error) + FindRetryableItems func(ctx context.Context, limit int) ([]model.Queue, error) + Load func(ctx context.Context, id int64) (model.Queue, error) + Save func(ctx context.Context, m *model.Queue) (*model.Queue, error) + UpdateNextRetry func(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error + type QueueWorker struct + func NewQueueWorker(opts ...Option) (*QueueWorker, error) + func (w *QueueWorker) CleanupExpiredItems(ctx context.Context) (int, error) + func (w *QueueWorker) GetDLQStats(ctx context.Context) (model.DLQStats, error) + func (w *QueueWorker) GetRetrySchedule() string + func (w *QueueWorker) ProcessPendingItems(ctx context.Context) (int, error) + func (w *QueueWorker) ProcessRetryableItems(ctx context.Context) (int, error) + func (w *QueueWorker) Run(ctx context.Context, interval time.Duration) + type SubscribeRequest struct + CallbackURL string + Identifier string + SubscriberID int64 + TopicCode string + type SubscriberRepository interface + FindByClientID func(ctx context.Context, clientID int64) (model.Subscriber, error) + FindByName func(ctx context.Context, name string) (model.Subscriber, error) + Load func(ctx context.Context, id int64) (model.Subscriber, error) + Save func(ctx context.Context, m model.Subscriber) (model.Subscriber, error) + type SubscriptionManager struct + func NewSubscriptionManager(opts ...SubscriptionManagerOption) (*SubscriptionManager, error) + func (sm *SubscriptionManager) GetSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error) + func (sm *SubscriptionManager) ListSubscriptions(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error) + func (sm *SubscriptionManager) ReactivateSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error) + func (sm *SubscriptionManager) Subscribe(ctx context.Context, req SubscribeRequest) (*model.Subscription, error) + func (sm *SubscriptionManager) Unsubscribe(ctx context.Context, subscriptionID int64) (*model.Subscription, error) + type SubscriptionManagerOption func(*SubscriptionManager) error + func WithSubscriptionManagerLogger(logger Logger) SubscriptionManagerOption + func WithSubscriptionManagerRepositories(subscriptionRepo SubscriptionRepository, subscriberRepo SubscriberRepository, ...) SubscriptionManagerOption + type SubscriptionRepository interface + FindActive func(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error) + FindAllActive func(ctx context.Context) ([]model.SubscriptionFull, error) + List func(ctx context.Context, filter Filter) ([]model.Subscription, error) + Load func(ctx context.Context, id int64) (model.Subscription, error) + Save func(ctx context.Context, m model.Subscription) (model.Subscription, error) + type TopicRepository interface + GetByTopicCode func(ctx context.Context, topicCode string) (model.Topic, error) + Load func(ctx context.Context, id int64) (model.Topic, error) + Save func(ctx context.Context, m model.Topic) (model.Topic, error) + type TransmitterProvider interface + GetCallbackUrl func(ctx context.Context, subscriberID int64) (string, error)