Documentation
¶
Overview ¶
Package relica provides repository implementations using Relica query builder.
Relica (github.com/coregx/relica) is a lightweight, type-safe database query builder for Go with zero production dependencies.
This package provides production-ready implementations of all pubsub repository interfaces:
- QueueRepository
- MessageRepository
- SubscriptionRepository
- DLQRepository
- PublisherRepository
- SubscriberRepository
- TopicRepository
Example usage:
import (
"database/sql"
"github.com/coregx/pubsub"
"github.com/coregx/pubsub/adapters/relica"
_ "github.com/go-sql-driver/mysql"
)
// Open database connection
db, err := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub_db?parseTime=true")
if err != nil {
log.Fatal(err)
}
// Create repositories (driverName should be "mysql", "postgres", or "sqlite3")
repos := relica.NewRepositories(db, "mysql")
// Create services
worker, err := pubsub.NewQueueWorker(
pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
)
Package relica provides Relica ORM implementations for PubSub repositories.
Package relica provides Relica ORM implementations for PubSub repositories.
Index ¶
- type DLQRepository
- func (r *DLQRepository) CountUnresolved(ctx context.Context) (int, error)
- func (r *DLQRepository) Delete(ctx context.Context, m model.DeadLetterQueue) error
- func (r *DLQRepository) FindByMessageID(ctx context.Context, messageID int64) (model.DeadLetterQueue, error)
- func (r *DLQRepository) FindBySubscription(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error)
- func (r *DLQRepository) FindOlderThan(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error)
- func (r *DLQRepository) FindUnresolved(ctx context.Context, limit int) ([]model.DeadLetterQueue, error)
- func (r *DLQRepository) GetStats(ctx context.Context) (model.DLQStats, error)
- func (r *DLQRepository) Load(ctx context.Context, id int64) (model.DeadLetterQueue, error)
- func (r *DLQRepository) Save(ctx context.Context, m model.DeadLetterQueue) (model.DeadLetterQueue, error)
- type MessageRepository
- func (r *MessageRepository) Delete(ctx context.Context, m model.Message) error
- func (r *MessageRepository) FindOutdatedMessages(ctx context.Context, days int) ([]model.Message, error)
- func (r *MessageRepository) Load(ctx context.Context, id int64) (model.Message, error)
- func (r *MessageRepository) Save(ctx context.Context, m model.Message) (model.Message, error)
- type PublisherRepository
- func (r *PublisherRepository) GetByPublisherCode(ctx context.Context, publisherCode string) (model.Publisher, error)
- func (r *PublisherRepository) Load(ctx context.Context, id int64) (model.Publisher, error)
- func (r *PublisherRepository) Save(ctx context.Context, m model.Publisher) (model.Publisher, error)
- type QueueRepository
- func (r *QueueRepository) Delete(ctx context.Context, m *model.Queue) error
- func (r *QueueRepository) FindByMessageID(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error)
- func (r *QueueRepository) FindBySubscriptionID(ctx context.Context, subscriptionID int64) ([]model.Queue, error)
- func (r *QueueRepository) FindExpiredItems(ctx context.Context, limit int) ([]model.Queue, error)
- func (r *QueueRepository) FindPendingItems(ctx context.Context, limit int) ([]model.Queue, error)
- func (r *QueueRepository) FindRetryableItems(ctx context.Context, limit int) ([]model.Queue, error)
- func (r *QueueRepository) Load(ctx context.Context, id int64) (model.Queue, error)
- func (r *QueueRepository) Save(ctx context.Context, m *model.Queue) (*model.Queue, error)
- func (r *QueueRepository) UpdateNextRetry(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error
- type Repositories
- type SubscriberRepository
- func (r *SubscriberRepository) FindByClientID(ctx context.Context, clientID int64) (model.Subscriber, error)
- func (r *SubscriberRepository) FindByName(ctx context.Context, name string) (model.Subscriber, error)
- func (r *SubscriberRepository) Load(ctx context.Context, id int64) (model.Subscriber, error)
- func (r *SubscriberRepository) Save(ctx context.Context, m model.Subscriber) (model.Subscriber, error)
- type SubscriptionRepository
- func (r *SubscriptionRepository) FindActive(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)
- func (r *SubscriptionRepository) FindAllActive(ctx context.Context) ([]model.SubscriptionFull, error)
- func (r *SubscriptionRepository) List(ctx context.Context, filter pubsub.Filter) ([]model.Subscription, error)
- func (r *SubscriptionRepository) Load(ctx context.Context, id int64) (model.Subscription, error)
- func (r *SubscriptionRepository) Save(ctx context.Context, m model.Subscription) (model.Subscription, error)
- type TopicRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DLQRepository ¶
type DLQRepository struct {
// contains filtered or unexported fields
}
DLQRepository implements pubsub.DLQRepository using Relica ORM.
func NewDLQRepository ¶
func NewDLQRepository(sqlDB *sql.DB, driverName string) *DLQRepository
NewDLQRepository creates a new DLQRepository with default table prefix.
func NewDLQRepositoryWithPrefix ¶
func NewDLQRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *DLQRepository
NewDLQRepositoryWithPrefix creates a new DLQRepository with custom table prefix.
func (*DLQRepository) CountUnresolved ¶
func (r *DLQRepository) CountUnresolved(ctx context.Context) (int, error)
CountUnresolved returns the count of unresolved DLQ items.
func (*DLQRepository) Delete ¶
func (r *DLQRepository) Delete(ctx context.Context, m model.DeadLetterQueue) error
Delete removes a DLQ item.
func (*DLQRepository) FindByMessageID ¶
func (r *DLQRepository) FindByMessageID(ctx context.Context, messageID int64) (model.DeadLetterQueue, error)
FindByMessageID retrieves a DLQ item for a specific message.
func (*DLQRepository) FindBySubscription ¶
func (r *DLQRepository) FindBySubscription(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error)
FindBySubscription retrieves DLQ items for a specific subscription.
func (*DLQRepository) FindOlderThan ¶
func (r *DLQRepository) FindOlderThan(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error)
FindOlderThan retrieves DLQ items older than the specified threshold.
func (*DLQRepository) FindUnresolved ¶
func (r *DLQRepository) FindUnresolved(ctx context.Context, limit int) ([]model.DeadLetterQueue, error)
FindUnresolved retrieves unresolved DLQ items.
func (*DLQRepository) Load ¶
func (r *DLQRepository) Load(ctx context.Context, id int64) (model.DeadLetterQueue, error)
Load retrieves a DLQ item by ID.
func (*DLQRepository) Save ¶
func (r *DLQRepository) Save(ctx context.Context, m model.DeadLetterQueue) (model.DeadLetterQueue, error)
Save creates or updates a DLQ item.
type MessageRepository ¶
type MessageRepository struct {
// contains filtered or unexported fields
}
MessageRepository implements pubsub.MessageRepository using Relica.
func NewMessageRepository ¶
func NewMessageRepository(sqlDB *sql.DB, driverName string) *MessageRepository
NewMessageRepository creates a new MessageRepository with default table prefix.
func NewMessageRepositoryWithPrefix ¶
func NewMessageRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *MessageRepository
NewMessageRepositoryWithPrefix creates a new MessageRepository with custom table prefix.
func (*MessageRepository) FindOutdatedMessages ¶
func (r *MessageRepository) FindOutdatedMessages(ctx context.Context, days int) ([]model.Message, error)
FindOutdatedMessages finds messages older than the specified number of days.
type PublisherRepository ¶
type PublisherRepository struct {
// contains filtered or unexported fields
}
PublisherRepository implements pubsub.PublisherRepository using Relica ORM.
func NewPublisherRepository ¶
func NewPublisherRepository(sqlDB *sql.DB, driverName string) *PublisherRepository
NewPublisherRepository creates a new PublisherRepository with default table prefix.
func NewPublisherRepositoryWithPrefix ¶
func NewPublisherRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *PublisherRepository
NewPublisherRepositoryWithPrefix creates a new PublisherRepository with custom table prefix.
func (*PublisherRepository) GetByPublisherCode ¶
func (r *PublisherRepository) GetByPublisherCode(ctx context.Context, publisherCode string) (model.Publisher, error)
GetByPublisherCode retrieves a publisher by its unique code.
type QueueRepository ¶
type QueueRepository struct {
// contains filtered or unexported fields
}
QueueRepository implements pubsub.QueueRepository using Relica.
func NewQueueRepository ¶
func NewQueueRepository(sqlDB *sql.DB, driverName string) *QueueRepository
NewQueueRepository creates a new QueueRepository with default table prefix.
func NewQueueRepositoryWithPrefix ¶
func NewQueueRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *QueueRepository
NewQueueRepositoryWithPrefix creates a new QueueRepository with custom table prefix.
func (*QueueRepository) FindByMessageID ¶
func (r *QueueRepository) FindByMessageID(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error)
FindByMessageID retrieves a queue item by message and subscription IDs.
func (*QueueRepository) FindBySubscriptionID ¶
func (r *QueueRepository) FindBySubscriptionID(ctx context.Context, subscriptionID int64) ([]model.Queue, error)
FindBySubscriptionID retrieves all queue items for a subscription.
func (*QueueRepository) FindExpiredItems ¶
FindExpiredItems retrieves expired queue items that should be cleaned up.
func (*QueueRepository) FindPendingItems ¶
FindPendingItems retrieves pending queue items ready for first delivery.
func (*QueueRepository) FindRetryableItems ¶
FindRetryableItems retrieves failed queue items ready for retry.
func (*QueueRepository) UpdateNextRetry ¶
func (r *QueueRepository) UpdateNextRetry(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error
UpdateNextRetry updates the next retry time and attempt count.
type Repositories ¶
type Repositories struct {
Queue pubsub.QueueRepository
Message pubsub.MessageRepository
Subscription pubsub.SubscriptionRepository
DLQ pubsub.DLQRepository
Publisher pubsub.PublisherRepository
Subscriber pubsub.SubscriberRepository
Topic pubsub.TopicRepository
}
Repositories holds all repository implementations.
func NewRepositories ¶
func NewRepositories(db *sql.DB, driverName string) *Repositories
NewRepositories creates all repository implementations using Relica.
The db parameter should be an *sql.DB connected to MySQL, PostgreSQL, or SQLite. The driverName should be "mysql", "postgres", or "sqlite3". The table prefix defaults to "pubsub_" but can be customized.
func NewRepositoriesWithPrefix ¶
func NewRepositoriesWithPrefix(db *sql.DB, driverName, prefix string) *Repositories
NewRepositoriesWithPrefix creates all repository implementations with a custom table prefix.
type SubscriberRepository ¶
type SubscriberRepository struct {
// contains filtered or unexported fields
}
SubscriberRepository implements pubsub.SubscriberRepository using Relica ORM.
func NewSubscriberRepository ¶
func NewSubscriberRepository(sqlDB *sql.DB, driverName string) *SubscriberRepository
NewSubscriberRepository creates a new SubscriberRepository with default table prefix.
func NewSubscriberRepositoryWithPrefix ¶
func NewSubscriberRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *SubscriberRepository
NewSubscriberRepositoryWithPrefix creates a new SubscriberRepository with custom table prefix.
func (*SubscriberRepository) FindByClientID ¶
func (r *SubscriberRepository) FindByClientID(ctx context.Context, clientID int64) (model.Subscriber, error)
FindByClientID retrieves a subscriber by client ID.
func (*SubscriberRepository) FindByName ¶
func (r *SubscriberRepository) FindByName(ctx context.Context, name string) (model.Subscriber, error)
FindByName retrieves a subscriber by name.
func (*SubscriberRepository) Load ¶
func (r *SubscriberRepository) Load(ctx context.Context, id int64) (model.Subscriber, error)
Load retrieves a subscriber by ID.
func (*SubscriberRepository) Save ¶
func (r *SubscriberRepository) Save(ctx context.Context, m model.Subscriber) (model.Subscriber, error)
Save creates or updates a subscriber.
type SubscriptionRepository ¶
type SubscriptionRepository struct {
// contains filtered or unexported fields
}
SubscriptionRepository implements pubsub.SubscriptionRepository using Relica ORM.
func NewSubscriptionRepository ¶
func NewSubscriptionRepository(sqlDB *sql.DB, driverName string) *SubscriptionRepository
NewSubscriptionRepository creates a new SubscriptionRepository with default table prefix.
func NewSubscriptionRepositoryWithPrefix ¶
func NewSubscriptionRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *SubscriptionRepository
NewSubscriptionRepositoryWithPrefix creates a new SubscriptionRepository with custom table prefix.
func (*SubscriptionRepository) FindActive ¶
func (r *SubscriptionRepository) FindActive(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)
FindActive finds active subscriptions matching the criteria.
func (*SubscriptionRepository) FindAllActive ¶
func (r *SubscriptionRepository) FindAllActive(ctx context.Context) ([]model.SubscriptionFull, error)
FindAllActive retrieves all active subscriptions with full details.
func (*SubscriptionRepository) List ¶
func (r *SubscriptionRepository) List(ctx context.Context, filter pubsub.Filter) ([]model.Subscription, error)
List retrieves subscriptions matching the filter criteria.
func (*SubscriptionRepository) Load ¶
func (r *SubscriptionRepository) Load(ctx context.Context, id int64) (model.Subscription, error)
Load retrieves a subscription by ID.
func (*SubscriptionRepository) Save ¶
func (r *SubscriptionRepository) Save(ctx context.Context, m model.Subscription) (model.Subscription, error)
Save creates or updates a subscription.
type TopicRepository ¶
type TopicRepository struct {
// contains filtered or unexported fields
}
TopicRepository implements pubsub.TopicRepository using Relica ORM.
func NewTopicRepository ¶
func NewTopicRepository(sqlDB *sql.DB, driverName string) *TopicRepository
NewTopicRepository creates a new TopicRepository with default table prefix.
func NewTopicRepositoryWithPrefix ¶
func NewTopicRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *TopicRepository
NewTopicRepositoryWithPrefix creates a new TopicRepository with custom table prefix.
func (*TopicRepository) GetByTopicCode ¶
func (r *TopicRepository) GetByTopicCode(ctx context.Context, topicCode string) (model.Topic, error)
GetByTopicCode retrieves a topic by its unique code.