relica

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package relica provides repository implementations using Relica query builder.

Relica (github.com/coregx/relica) is a lightweight, type-safe database query builder for Go with zero production dependencies.

This package provides production-ready implementations of all pubsub repository interfaces:

  • QueueRepository
  • MessageRepository
  • SubscriptionRepository
  • DLQRepository
  • PublisherRepository
  • SubscriberRepository
  • TopicRepository

Example usage:

import (
    "database/sql"
    "github.com/coregx/pubsub"
    "github.com/coregx/pubsub/adapters/relica"
    _ "github.com/go-sql-driver/mysql"
)

// Open database connection
db, err := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub_db?parseTime=true")
if err != nil {
    log.Fatal(err)
}

// Create repositories (driverName should be "mysql", "postgres", or "sqlite3")
repos := relica.NewRepositories(db, "mysql")

// Create services
worker, err := pubsub.NewQueueWorker(
    pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
    pubsub.WithDelivery(transmitterProvider, gateway),
    pubsub.WithLogger(logger),
)

Package relica provides Relica ORM implementations for PubSub repositories.

Package relica provides Relica ORM implementations for PubSub repositories.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DLQRepository

type DLQRepository struct {
	// contains filtered or unexported fields
}

DLQRepository implements pubsub.DLQRepository using Relica ORM.

func NewDLQRepository

func NewDLQRepository(sqlDB *sql.DB, driverName string) *DLQRepository

NewDLQRepository creates a new DLQRepository with default table prefix.

func NewDLQRepositoryWithPrefix

func NewDLQRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *DLQRepository

NewDLQRepositoryWithPrefix creates a new DLQRepository with custom table prefix.

func (*DLQRepository) CountUnresolved

func (r *DLQRepository) CountUnresolved(ctx context.Context) (int, error)

CountUnresolved returns the count of unresolved DLQ items.

func (*DLQRepository) Delete

Delete removes a DLQ item.

func (*DLQRepository) FindByMessageID

func (r *DLQRepository) FindByMessageID(ctx context.Context, messageID int64) (model.DeadLetterQueue, error)

FindByMessageID retrieves a DLQ item for a specific message.

func (*DLQRepository) FindBySubscription

func (r *DLQRepository) FindBySubscription(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error)

FindBySubscription retrieves DLQ items for a specific subscription.

func (*DLQRepository) FindOlderThan

func (r *DLQRepository) FindOlderThan(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error)

FindOlderThan retrieves DLQ items older than the specified threshold.

func (*DLQRepository) FindUnresolved

func (r *DLQRepository) FindUnresolved(ctx context.Context, limit int) ([]model.DeadLetterQueue, error)

FindUnresolved retrieves unresolved DLQ items.

func (*DLQRepository) GetStats

func (r *DLQRepository) GetStats(ctx context.Context) (model.DLQStats, error)

GetStats retrieves DLQ statistics.

func (*DLQRepository) Load

Load retrieves a DLQ item by ID.

func (*DLQRepository) Save

Save creates or updates a DLQ item.

type MessageRepository

type MessageRepository struct {
	// contains filtered or unexported fields
}

MessageRepository implements pubsub.MessageRepository using Relica.

func NewMessageRepository

func NewMessageRepository(sqlDB *sql.DB, driverName string) *MessageRepository

NewMessageRepository creates a new MessageRepository with default table prefix.

func NewMessageRepositoryWithPrefix

func NewMessageRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *MessageRepository

NewMessageRepositoryWithPrefix creates a new MessageRepository with custom table prefix.

func (*MessageRepository) Delete

func (r *MessageRepository) Delete(ctx context.Context, m model.Message) error

Delete removes a message.

func (*MessageRepository) FindOutdatedMessages

func (r *MessageRepository) FindOutdatedMessages(ctx context.Context, days int) ([]model.Message, error)

FindOutdatedMessages finds messages older than the specified number of days.

func (*MessageRepository) Load

Load retrieves a message by ID.

func (*MessageRepository) Save

Save creates or updates a message.

type PublisherRepository

type PublisherRepository struct {
	// contains filtered or unexported fields
}

PublisherRepository implements pubsub.PublisherRepository using Relica ORM.

func NewPublisherRepository

func NewPublisherRepository(sqlDB *sql.DB, driverName string) *PublisherRepository

NewPublisherRepository creates a new PublisherRepository with default table prefix.

func NewPublisherRepositoryWithPrefix

func NewPublisherRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *PublisherRepository

NewPublisherRepositoryWithPrefix creates a new PublisherRepository with custom table prefix.

func (*PublisherRepository) GetByPublisherCode

func (r *PublisherRepository) GetByPublisherCode(ctx context.Context, publisherCode string) (model.Publisher, error)

GetByPublisherCode retrieves a publisher by its unique code.

func (*PublisherRepository) Load

Load retrieves a publisher by ID.

func (*PublisherRepository) Save

Save creates or updates a publisher.

type QueueRepository

type QueueRepository struct {
	// contains filtered or unexported fields
}

QueueRepository implements pubsub.QueueRepository using Relica.

func NewQueueRepository

func NewQueueRepository(sqlDB *sql.DB, driverName string) *QueueRepository

NewQueueRepository creates a new QueueRepository with default table prefix.

func NewQueueRepositoryWithPrefix

func NewQueueRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *QueueRepository

NewQueueRepositoryWithPrefix creates a new QueueRepository with custom table prefix.

func (*QueueRepository) Delete

func (r *QueueRepository) Delete(ctx context.Context, m *model.Queue) error

Delete removes a queue item.

func (*QueueRepository) FindByMessageID

func (r *QueueRepository) FindByMessageID(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error)

FindByMessageID retrieves a queue item by message and subscription IDs.

func (*QueueRepository) FindBySubscriptionID

func (r *QueueRepository) FindBySubscriptionID(ctx context.Context, subscriptionID int64) ([]model.Queue, error)

FindBySubscriptionID retrieves all queue items for a subscription.

func (*QueueRepository) FindExpiredItems

func (r *QueueRepository) FindExpiredItems(ctx context.Context, limit int) ([]model.Queue, error)

FindExpiredItems retrieves expired queue items that should be cleaned up.

func (*QueueRepository) FindPendingItems

func (r *QueueRepository) FindPendingItems(ctx context.Context, limit int) ([]model.Queue, error)

FindPendingItems retrieves pending queue items ready for first delivery.

func (*QueueRepository) FindRetryableItems

func (r *QueueRepository) FindRetryableItems(ctx context.Context, limit int) ([]model.Queue, error)

FindRetryableItems retrieves failed queue items ready for retry.

func (*QueueRepository) Load

func (r *QueueRepository) Load(ctx context.Context, id int64) (model.Queue, error)

Load retrieves a queue item by ID.

func (*QueueRepository) Save

func (r *QueueRepository) Save(ctx context.Context, m *model.Queue) (*model.Queue, error)

Save creates or updates a queue item.

func (*QueueRepository) UpdateNextRetry

func (r *QueueRepository) UpdateNextRetry(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error

UpdateNextRetry updates the next retry time and attempt count.

type Repositories

Repositories holds all repository implementations.

func NewRepositories

func NewRepositories(db *sql.DB, driverName string) *Repositories

NewRepositories creates all repository implementations using Relica.

The db parameter should be an *sql.DB connected to MySQL, PostgreSQL, or SQLite. The driverName should be "mysql", "postgres", or "sqlite3". The table prefix defaults to "pubsub_" but can be customized.

func NewRepositoriesWithPrefix

func NewRepositoriesWithPrefix(db *sql.DB, driverName, prefix string) *Repositories

NewRepositoriesWithPrefix creates all repository implementations with a custom table prefix.

type SubscriberRepository

type SubscriberRepository struct {
	// contains filtered or unexported fields
}

SubscriberRepository implements pubsub.SubscriberRepository using Relica ORM.

func NewSubscriberRepository

func NewSubscriberRepository(sqlDB *sql.DB, driverName string) *SubscriberRepository

NewSubscriberRepository creates a new SubscriberRepository with default table prefix.

func NewSubscriberRepositoryWithPrefix

func NewSubscriberRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *SubscriberRepository

NewSubscriberRepositoryWithPrefix creates a new SubscriberRepository with custom table prefix.

func (*SubscriberRepository) FindByClientID

func (r *SubscriberRepository) FindByClientID(ctx context.Context, clientID int64) (model.Subscriber, error)

FindByClientID retrieves a subscriber by client ID.

func (*SubscriberRepository) FindByName

func (r *SubscriberRepository) FindByName(ctx context.Context, name string) (model.Subscriber, error)

FindByName retrieves a subscriber by name.

func (*SubscriberRepository) Load

Load retrieves a subscriber by ID.

func (*SubscriberRepository) Save

Save creates or updates a subscriber.

type SubscriptionRepository

type SubscriptionRepository struct {
	// contains filtered or unexported fields
}

SubscriptionRepository implements pubsub.SubscriptionRepository using Relica ORM.

func NewSubscriptionRepository

func NewSubscriptionRepository(sqlDB *sql.DB, driverName string) *SubscriptionRepository

NewSubscriptionRepository creates a new SubscriptionRepository with default table prefix.

func NewSubscriptionRepositoryWithPrefix

func NewSubscriptionRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *SubscriptionRepository

NewSubscriptionRepositoryWithPrefix creates a new SubscriptionRepository with custom table prefix.

func (*SubscriptionRepository) FindActive

func (r *SubscriptionRepository) FindActive(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)

FindActive finds active subscriptions matching the criteria.

func (*SubscriptionRepository) FindAllActive

FindAllActive retrieves all active subscriptions with full details.

func (*SubscriptionRepository) List

List retrieves subscriptions matching the filter criteria.

func (*SubscriptionRepository) Load

Load retrieves a subscription by ID.

func (*SubscriptionRepository) Save

Save creates or updates a subscription.

type TopicRepository

type TopicRepository struct {
	// contains filtered or unexported fields
}

TopicRepository implements pubsub.TopicRepository using Relica ORM.

func NewTopicRepository

func NewTopicRepository(sqlDB *sql.DB, driverName string) *TopicRepository

NewTopicRepository creates a new TopicRepository with default table prefix.

func NewTopicRepositoryWithPrefix

func NewTopicRepositoryWithPrefix(sqlDB *sql.DB, driverName, prefix string) *TopicRepository

NewTopicRepositoryWithPrefix creates a new TopicRepository with custom table prefix.

func (*TopicRepository) GetByTopicCode

func (r *TopicRepository) GetByTopicCode(ctx context.Context, topicCode string) (model.Topic, error)

GetByTopicCode retrieves a topic by its unique code.

func (*TopicRepository) Load

func (r *TopicRepository) Load(ctx context.Context, id int64) (model.Topic, error)

Load retrieves a topic by ID.

func (*TopicRepository) Save

Save creates or updates a topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL