model

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package model contains all domain models and data structures for the PubSub system.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueItemExpired indicates the queue item has passed its expiration time.
	ErrQueueItemExpired = DomainError{Code: "QUEUE_EXPIRED", Message: "Queue item has expired"}

	// ErrQueueItemAlreadySent indicates the message was already successfully delivered.
	ErrQueueItemAlreadySent = DomainError{Code: "ALREADY_SENT", Message: "Queue item already sent"}

	// ErrMaxAttemptsExceeded indicates the item has reached the maximum retry attempts.
	ErrMaxAttemptsExceeded = DomainError{Code: "MAX_ATTEMPTS", Message: "Maximum delivery attempts exceeded"}

	// ErrNotReadyForRetry indicates the retry delay hasn't elapsed yet.
	ErrNotReadyForRetry = DomainError{Code: "NOT_READY", Message: "Not ready for retry yet"}

	// ErrNoRetryScheduled indicates no retry time has been set for this item.
	ErrNoRetryScheduled = DomainError{Code: "NO_RETRY", Message: "No retry scheduled"}
)

Domain errors returned by Queue business logic methods.

Functions

This section is empty.

Types

type Attributes

type Attributes map[string]string

Attributes represents a map of key-value pairs for message metadata.

type DLQStats

type DLQStats struct {
	TotalItems       int       `json:"totalItems"`
	UnresolvedItems  int       `json:"unresolvedItems"`
	ResolvedItems    int       `json:"resolvedItems"`
	OldestItemAge    int64     `json:"oldestItemAge"` // Seconds
	NewestItemAge    int64     `json:"newestItemAge"` // Seconds
	TopFailureReason string    `json:"topFailureReason"`
	LastUpdated      time.Time `json:"lastUpdated"`
}

DLQStats represents aggregate statistics for the Dead Letter Queue. Used for monitoring dashboards and operational visibility.

type DataMessage

type DataMessage struct {
	MessageID   string     `json:"messageID"`
	PublishTime time.Time  `json:"publishTime"`
	OrderingKey string     `json:"orderingKey"`
	Attributes  Attributes `json:"attributes"`
	Data        string     `json:"data"`
	Identifier  string
}

DataMessage represents a message with metadata for delivery to subscribers.

func NewDataMessage

func NewDataMessage(messageID string, _ time.Time, identifier, data string) *DataMessage

NewDataMessage creates a new DataMessage with the given parameters.

func (*DataMessage) FromString

func (d *DataMessage) FromString(_ string) error

FromString parses message data from a string (currently a no-op).

func (*DataMessage) ToBase64

func (d *DataMessage) ToBase64() string

ToBase64 returns the message data as a base64 string (currently returns empty).

func (*DataMessage) ToString

func (d *DataMessage) ToString() string

ToString returns the message data as a string.

type DeadLetterQueue

type DeadLetterQueue struct {
	ID              int64 `json:"id"`
	SubscriptionID  int64 `json:"subscriptionID" db:"subscription_id"`
	MessageID       int64 `json:"messageID" db:"message_id"`
	OriginalQueueID int64 `json:"originalQueueId" db:"original_queue_id"` // Reference to original queue item

	// Failure information
	AttemptCount  int    `json:"attemptCount" db:"attempt_count"`   // Total attempts before DLQ
	LastError     string `json:"lastError" db:"last_error"`         // Last error message
	FailureReason string `json:"failureReason" db:"failure_reason"` // Reason for moving to DLQ

	// Timing information
	FirstAttemptAt time.Time `json:"firstAttemptAt" db:"first_attempt_at"` // When first delivery was attempted
	LastAttemptAt  time.Time `json:"lastAttemptAt" db:"last_attempt_at"`   // When last attempt failed
	MovedToDLQAt   time.Time `json:"movedToDlqAt" db:"moved_to_dlq_at"`    // When moved to DLQ

	// Message data (denormalized for easy access)
	MessageData string `json:"messageData" db:"message_data"` // Original message payload
	CallbackURL string `json:"callbackURL" db:"callback_url"` // Target webhook URL

	// Lifecycle
	IsResolved     bool       `json:"isResolved" db:"is_resolved"`         // Manual resolution flag
	ResolvedAt     *time.Time `json:"resolvedAt" db:"resolved_at"`         // When manually resolved
	ResolvedBy     string     `json:"resolvedBy" db:"resolved_by"`         // Who resolved (user/system)
	ResolutionNote string     `json:"resolutionNote" db:"resolution_note"` // Resolution explanation

	CreatedAt time.Time `json:"createdAt" db:"created_at"`
}

DeadLetterQueue represents a permanently failed message that exceeded the retry threshold. Messages are automatically moved here after exhausting all retry attempts (typically 5-7 attempts).

The DLQ serves as:

  • Failure audit log with full diagnostic information
  • Manual intervention queue for operations teams
  • Source for failure analysis and monitoring

Business logic methods:

  • Resolve: Mark item as manually resolved
  • GetAge: Calculate time in DLQ
  • IsOld: Check if item needs attention

Items remain in DLQ until manually resolved or deleted.

func NewDeadLetterQueue

func NewDeadLetterQueue(
	subscriptionID, messageID, originalQueueID int64,
	attemptCount int,
	lastError, failureReason string,
	firstAttemptAt, lastAttemptAt time.Time,
	messageData, callbackURL string,
) DeadLetterQueue

NewDeadLetterQueue creates a new Dead Letter Queue entry from a failed queue item. This is called automatically by QueueWorker when a queue item exceeds the retry threshold.

Denormalizes message data and callback URL for easy access without joining tables.

func (*DeadLetterQueue) GetAge

func (d *DeadLetterQueue) GetAge() time.Duration

GetAge returns how long the item has been in the Dead Letter Queue.

func (*DeadLetterQueue) IsOld

func (d *DeadLetterQueue) IsOld(threshold time.Duration) bool

IsOld checks if the item has been in DLQ longer than the threshold duration. Used to identify stuck items that need urgent attention.

func (*DeadLetterQueue) Resolve

func (d *DeadLetterQueue) Resolve(resolvedBy, note string)

Resolve marks the DLQ item as manually resolved by an operator. This is typically used after:

  • Manual message replay
  • Fixing the root cause and redelivering
  • Determining the failure is acceptable and can be ignored

Parameters:

  • resolvedBy: Username/system that resolved the item
  • note: Explanation of the resolution action taken

func (DeadLetterQueue) TableName

func (d DeadLetterQueue) TableName() string

TableName returns the database table name for DeadLetterQueue.

type DomainError

type DomainError struct {
	Code    string // Error code for programmatic handling
	Message string // Human-readable error message
}

DomainError represents a domain-level business rule violation. Used by Queue methods to return business logic errors.

func (DomainError) Error

func (e DomainError) Error() string

type Message

type Message struct {
	ID         int64     `json:"id"`         // Unique message ID
	TopicID    int64     `json:"topicID"`    // Topic this message belongs to
	Identifier string    `json:"identifier"` // Event identifier (e.g., "user-123")
	Data       string    `json:"data"`       // Message payload (JSON or string)
	CreatedAt  time.Time `json:"createdAt"`  // Publication timestamp
}

Message represents a published message in the pub/sub system. Messages are immutable once created and contain the actual payload to be delivered.

Each published message creates queue items for all active subscriptions to its topic. Messages are retained for archival/audit purposes even after successful delivery.

func NewMessage

func NewMessage(topicID int64, identifier, data string) Message

NewMessage creates a new message for publication. Messages are immutable after creation.

Parameters:

  • topicID: The topic to publish to
  • identifier: Event identifier for filtering/routing
  • data: Message payload (typically JSON)

func (Message) TableName

func (t Message) TableName() string

TableName returns the database table name for Message.

type NotificationLog

type NotificationLog struct {
	ID             int64          `json:"id"`
	SubscriptionID int64          `json:"subscriptionID"`
	MessageID      int64          `json:"messageID"`
	Identifier     string         `json:"identifier"`
	TopicCode      string         `json:"topicCode"`
	SubscriberType string         `json:"subscriberType"` // "client", "service"
	SubscriberID   int64          `json:"subscriberID"`
	DeliveryMethod string         `json:"deliveryMethod"` // "webhook", "grpc", "pull"
	Status         string         `json:"status"`         // "pending", "sent", "failed", "skipped"
	SkippedReason  sql.NullString `json:"skippedReason"`
	SentAt         sql.NullTime   `json:"sentAt"`
	CreatedAt      time.Time      `json:"createdAt"`
}

NotificationLog - Log of all notification attempts for analytics and debugging.

func NewNotificationLog

func NewNotificationLog(
	subscriptionID int64,
	messageID int64,
	identifier string,
	topicCode string,
	subscriberType string,
	subscriberID int64,
	deliveryMethod string,
	status string,
) NotificationLog

NewNotificationLog creates a new notification log entry.

func (NotificationLog) TableName

func (t NotificationLog) TableName() string

TableName returns the database table name for NotificationLog.

type PublishResult

type PublishResult struct {
	Result bool // True if publish succeeded
}

PublishResult represents the result of a publish operation. Currently contains a simple success flag, can be extended with message IDs, etc.

type Publisher

type Publisher struct {
	ID          int64     `json:"id"`                        // Unique publisher ID
	Code        string    `json:"code" db:"publisher_code"`  // Unique publisher code (e.g., "user-service")
	Name        string    `json:"name"`                      // Human-readable publisher name
	Description string    `json:"description"`               // Publisher description
	IsActive    bool      `json:"isActive" db:"is_active"`   // Only active publishers can publish
	CreatedAt   time.Time `json:"createdAt" db:"created_at"` // Publisher registration time
}

Publisher represents a message publisher in the pub/sub system. Publishers are registered services or applications that can publish messages to topics.

Each publisher is identified by a unique code and can be activated/deactivated. Inactive publishers cannot publish new messages.

func NewPublisher

func NewPublisher(code, name, description string) Publisher

NewPublisher creates a new active publisher.

Parameters:

  • code: Unique publisher identifier (e.g., "user-service", "order-processor")
  • name: Human-readable name for display
  • description: Purpose and details about this publisher

func (Publisher) TableName

func (t Publisher) TableName() string

TableName returns the database table name for Publisher.

type Queue

type Queue struct {
	ID                 int64          `json:"id"`
	SubscriptionID     int64          `json:"subscriptionID"`
	MessageID          int64          `json:"messageID"`
	Status             QueueStatus    `json:"status" db:"status"`                          // NEW: from 00019
	AttemptCount       int            `json:"attemptCount" db:"attempt_count"`             // NEW: from 00019
	LastAttemptAt      sql.NullTime   `json:"lastAttemptAt" db:"last_attempt_at"`          // NEW: from 00019
	NextRetryAt        sql.NullTime   `json:"nextRetryAt" db:"next_retry_at"`              // NEW: from 00019
	LastError          sql.NullString `json:"lastError" db:"last_error"`                   // NEW: from 00019
	ExpiresAt          time.Time      `json:"expiresAt" db:"expires_at"`                   // NEW: from 00019
	SequenceNumber     int64          `json:"sequenceNumber" db:"sequence_number"`         // NEW: from 00019
	OperationTimestamp time.Time      `json:"operationTimestamp" db:"operation_timestamp"` // NEW: from 00019
	RetryAt            sql.NullTime   `json:"retryAt"`                                     // LEGACY: keep for backward compatibility
	IsComplete         bool           `json:"isComplete"`                                  // LEGACY: deprecated, use Status
	CompletedAt        sql.NullTime   `json:"completedAt"`
	CreatedAt          time.Time      `json:"createdAt"`
}

Queue represents a message queued for delivery to a subscriber. It contains retry logic state, timing information, and error tracking.

Queue items follow this lifecycle:

  1. Created with status=PENDING
  2. Delivery attempted → either SENT (success) or FAILED (retry)
  3. FAILED items retry with exponential backoff
  4. After exceeding retry threshold → moved to Dead Letter Queue (DLQ)

Business logic methods:

  • MarkSent/MarkFailed: Update status after delivery attempt
  • CanAttemptDelivery: Check if delivery can be attempted
  • ShouldRetry: Check if item is ready for retry
  • ShouldMoveToDLQ: Check if exhausted retries

This model implements Domain-Driven Design with rich business logic.

func NewQueue

func NewQueue(subscriptionID, messageID int64) Queue

NewQueue creates a new queue item for message delivery. Initial state: PENDING, AttemptCount=0, NextRetryAt=now (ready immediately). Default expiry: 24 hours from creation.

func (*Queue) CanAttemptDelivery

func (t *Queue) CanAttemptDelivery(maxAttempts int) error

CanAttemptDelivery validates whether delivery can be attempted based on business rules. Checks expiration, status, max attempts, and retry timing.

Returns error if delivery cannot be attempted:

  • ErrQueueItemExpired: Item has expired
  • ErrQueueItemAlreadySent: Already successfully delivered
  • ErrMaxAttemptsExceeded: Exceeded retry limit
  • ErrNotReadyForRetry: Too soon for retry

func (*Queue) GetAge

func (t *Queue) GetAge() time.Duration

GetAge returns how long the queue item has existed since creation.

func (*Queue) GetTimeUntilExpiry

func (t *Queue) GetTimeUntilExpiry() time.Duration

GetTimeUntilExpiry returns the duration until the item expires. Negative duration means already expired.

func (*Queue) GetTimeUntilRetry

func (t *Queue) GetTimeUntilRetry() (time.Duration, error)

GetTimeUntilRetry returns the duration until the next retry attempt. Returns 0 if ready for retry now, or error if no retry is scheduled.

func (*Queue) IsExpired

func (t *Queue) IsExpired() bool

IsExpired checks if the queue item has passed its expiration time. Expired items are cleaned up by the queue worker.

func (*Queue) MarkFailed

func (t *Queue) MarkFailed(err error, retryAfter time.Duration)

MarkFailed marks the queue item as failed and schedules the next retry attempt. Increments attempt count, records error message, and calculates next retry time.

Parameters:

  • err: The delivery error (stored in LastError)
  • retryAfter: Duration to wait before next retry (exponential backoff)

func (*Queue) MarkSent

func (t *Queue) MarkSent()

MarkSent marks the queue item as successfully delivered. Sets status to SENT and updates timing fields.

func (*Queue) RecordAttemptStart

func (t *Queue) RecordAttemptStart()

RecordAttemptStart marks the beginning of a delivery attempt. Records timing only - attempt count is incremented by MarkFailed or MarkSent.

func (*Queue) SetComplete

func (t *Queue) SetComplete()

SetComplete marks the queue item as complete (deprecated, use MarkSent instead).

func (*Queue) ShouldMoveToDLQ

func (t *Queue) ShouldMoveToDLQ(dlqThreshold int) bool

ShouldMoveToDLQ checks if the item should be moved to the Dead Letter Queue. Returns true when attempt count reaches the DLQ threshold and status is FAILED.

func (*Queue) ShouldRetry

func (t *Queue) ShouldRetry() bool

ShouldRetry checks if the item is ready for retry attempt. Returns true if status=FAILED, has valid NextRetryAt, and time has passed.

func (*Queue) TableName

func (t *Queue) TableName() string

TableName returns the database table name for Queue.

type QueueStatus

type QueueStatus string

QueueStatus represents the lifecycle state of a queue item.

const (
	// QueueStatusPending indicates the message is awaiting first delivery attempt.
	QueueStatusPending QueueStatus = "pending"

	// QueueStatusSent indicates successful message delivery.
	QueueStatusSent QueueStatus = "sent"

	// QueueStatusFailed indicates delivery failed and item is awaiting retry.
	QueueStatusFailed QueueStatus = "failed"
)

type QueueWithDetails

type QueueWithDetails struct {
	// Core fields
	QueueID        int64  `json:"queueId"`
	MessageID      int64  `json:"messageID"`
	SubscriptionID int64  `json:"subscriptionID"`
	Status         string `json:"status"` // "pending", "sent", "failed"

	// Retry tracking
	AttemptCount  int        `json:"attemptCount"`
	LastAttemptAt *time.Time `json:"lastAttemptAt"`
	NextRetryAt   *time.Time `json:"nextRetryAt"`
	LastError     string     `json:"lastError"`

	// Ordering and TTL
	SequenceNumber int64      `json:"sequenceNumber"`
	OperationTime  time.Time  `json:"operationTime"`
	ExpiresAt      *time.Time `json:"expiresAt"`
	CreatedAt      time.Time  `json:"createdAt"`

	// Message details
	Identifier string `json:"identifier"`
	Data       []byte `json:"data"`

	// Subscription details
	CallbackURL  string `json:"callbackURL"`
	SubscriberID int64  `json:"subscriberID"`
}

QueueWithDetails - Extended queue model with new TTL-based retry fields (Migration 00018). Used by NotificationService for guaranteed delivery with exponential backoff.

type Subscriber

type Subscriber struct {
	ID         int64     `json:"id"`                          // Unique subscriber ID
	ClientID   int64     `json:"clientID" db:"client_id"`     // Associated client/tenant ID
	Name       string    `json:"name"`                        // Subscriber name
	WebhookURL string    `json:"webhookURL" db:"webhook_url"` // HTTP endpoint for message delivery
	IsActive   bool      `json:"isActive" db:"is_active"`     // Only active subscribers receive messages
	CreatedAt  time.Time `json:"createdAt" db:"created_at"`   // Subscriber registration time
}

Subscriber represents a message consumer in the pub/sub system. Subscribers receive messages via webhooks when topics they're subscribed to receive new messages.

Each subscriber:

  • Has a webhook URL for message delivery
  • Is associated with a client (tenant/organization)
  • Can have multiple subscriptions to different topics
  • Can be activated/deactivated

func NewSubscriber

func NewSubscriber(clientID int64, name, webhookURL string) Subscriber

NewSubscriber creates a new active subscriber.

Parameters:

  • clientID: The client/tenant this subscriber belongs to
  • name: Human-readable subscriber name
  • webhookURL: HTTP endpoint to receive message deliveries

func (Subscriber) TableName

func (t Subscriber) TableName() string

TableName returns the database table name for Subscriber.

type Subscription

type Subscription struct {
	ID           int64        `json:"id"`           // Unique subscription ID
	SubscriberID int64        `json:"subscriberID"` // Subscriber who owns this subscription
	TopicID      int64        `json:"topicID"`      // Topic being subscribed to
	Identifier   string       `json:"identifier"`   // Event identifier filter
	IsActive     bool         `json:"isActive"`     // Active subscriptions receive messages
	CreatedAt    time.Time    `json:"createdAt"`    // Subscription creation time
	DeletedAt    sql.NullTime `json:"deletedAt"`    // Soft delete timestamp
}

Subscription represents a subscriber's subscription to a topic. Subscriptions connect subscribers to topics, enabling message delivery routing.

Each subscription:

  • Links a subscriber to a topic
  • Filters messages by identifier (e.g., "user-123")
  • Can be activated/deactivated (soft delete)
  • Creates queue items when matching messages are published

Lifecycle: Active subscriptions receive new messages, inactive ones don't.

func NewSubscription

func NewSubscription(subscriberID, topicID int64, identifier, _ string) Subscription

NewSubscription creates a new active subscription. The callbackURL parameter is retained for compatibility but stored on the Subscriber.

Parameters:

  • subscriberID: The subscriber creating this subscription
  • topicID: The topic to subscribe to
  • identifier: Event identifier for filtering (e.g., "user-123", "order-*")
  • callbackURL: Webhook URL (typically stored on Subscriber, parameter kept for compatibility)

func (*Subscription) Deactivate

func (m *Subscription) Deactivate()

Deactivate performs a soft delete on the subscription. Deactivated subscriptions stop receiving new messages but are retained for audit purposes.

func (Subscription) TableName

func (m Subscription) TableName() string

TableName returns the database table name for Subscription.

type SubscriptionFull

type SubscriptionFull struct {
	Subscription        // Embedded base subscription
	Messages     int    // Count of messages delivered
	CallbackURL  string // Denormalized webhook URL from subscriber
}

SubscriptionFull is an extended subscription view with denormalized fields. Used by queries that need subscription details along with statistics and webhook URLs.

type SubscriptionV2

type SubscriptionV2 struct {
	ID                 int64      `json:"id"`
	SubscriberID       int64      `json:"subscriberID"`
	TopicID            int64      `json:"topicID"`
	Identifier         string     `json:"identifier"`
	CallbackURL        string     `json:"callbackURL"`
	IsActive           bool       `json:"isActive"`
	TrackID            string     `json:"trackId"`
	CreatedAt          time.Time  `json:"createdAt"`
	LastNotificationAt *time.Time `json:"lastNotificationAt"` // New field (Migration 00018)
}

SubscriptionV2 - Extended with last_notification_at for duplicate prevention.

type Topic

type Topic struct {
	ID          int64     `json:"id"`                        // Unique topic ID
	Code        string    `json:"code" db:"topic_code"`      // Unique topic code (e.g., "user.signup")
	Name        string    `json:"name"`                      // Human-readable topic name
	Description string    `json:"description"`               // Topic purpose and details
	IsActive    bool      `json:"isActive" db:"is_active"`   // Only active topics accept new messages
	CreatedAt   time.Time `json:"createdAt" db:"created_at"` // Topic creation time
}

Topic represents a message category/channel in the pub/sub system. Topics define the routing mechanism for messages to subscribers.

When a message is published to a topic, it is delivered to all active subscriptions matching that topic and identifier.

Topics can be hierarchical using dot notation (e.g., "user.created", "order.payment.completed").

func NewTopic

func NewTopic(code, name, description string) Topic

NewTopic creates a new active topic.

Parameters:

  • code: Unique topic identifier (e.g., "user.signup", "order.created")
  • name: Human-readable name for display
  • description: Purpose and usage details for this topic

func (Topic) TableName

func (t Topic) TableName() string

TableName returns the database table name for Topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL