Documentation
¶
Overview ¶
Package model contains all domain models and data structures for the PubSub system.
Index ¶
- Variables
- type Attributes
- type DLQStats
- type DataMessage
- type DeadLetterQueue
- type DomainError
- type Message
- type NotificationLog
- type PublishResult
- type Publisher
- type Queue
- func (t *Queue) CanAttemptDelivery(maxAttempts int) error
- func (t *Queue) GetAge() time.Duration
- func (t *Queue) GetTimeUntilExpiry() time.Duration
- func (t *Queue) GetTimeUntilRetry() (time.Duration, error)
- func (t *Queue) IsExpired() bool
- func (t *Queue) MarkFailed(err error, retryAfter time.Duration)
- func (t *Queue) MarkSent()
- func (t *Queue) RecordAttemptStart()
- func (t *Queue) SetComplete()
- func (t *Queue) ShouldMoveToDLQ(dlqThreshold int) bool
- func (t *Queue) ShouldRetry() bool
- func (t *Queue) TableName() string
- type QueueStatus
- type QueueWithDetails
- type Subscriber
- type Subscription
- type SubscriptionFull
- type SubscriptionV2
- type Topic
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueItemExpired indicates the queue item has passed its expiration time. ErrQueueItemExpired = DomainError{Code: "QUEUE_EXPIRED", Message: "Queue item has expired"} // ErrQueueItemAlreadySent indicates the message was already successfully delivered. ErrQueueItemAlreadySent = DomainError{Code: "ALREADY_SENT", Message: "Queue item already sent"} // ErrMaxAttemptsExceeded indicates the item has reached the maximum retry attempts. ErrMaxAttemptsExceeded = DomainError{Code: "MAX_ATTEMPTS", Message: "Maximum delivery attempts exceeded"} // ErrNotReadyForRetry indicates the retry delay hasn't elapsed yet. ErrNotReadyForRetry = DomainError{Code: "NOT_READY", Message: "Not ready for retry yet"} // ErrNoRetryScheduled indicates no retry time has been set for this item. ErrNoRetryScheduled = DomainError{Code: "NO_RETRY", Message: "No retry scheduled"} )
Domain errors returned by Queue business logic methods.
Functions ¶
This section is empty.
Types ¶
type Attributes ¶
Attributes represents a map of key-value pairs for message metadata.
type DLQStats ¶
type DLQStats struct {
TotalItems int `json:"totalItems"`
UnresolvedItems int `json:"unresolvedItems"`
ResolvedItems int `json:"resolvedItems"`
OldestItemAge int64 `json:"oldestItemAge"` // Seconds
NewestItemAge int64 `json:"newestItemAge"` // Seconds
TopFailureReason string `json:"topFailureReason"`
LastUpdated time.Time `json:"lastUpdated"`
}
DLQStats represents aggregate statistics for the Dead Letter Queue. Used for monitoring dashboards and operational visibility.
type DataMessage ¶
type DataMessage struct {
MessageID string `json:"messageID"`
PublishTime time.Time `json:"publishTime"`
OrderingKey string `json:"orderingKey"`
Attributes Attributes `json:"attributes"`
Data string `json:"data"`
Identifier string
}
DataMessage represents a message with metadata for delivery to subscribers.
func NewDataMessage ¶
func NewDataMessage(messageID string, _ time.Time, identifier, data string) *DataMessage
NewDataMessage creates a new DataMessage with the given parameters.
func (*DataMessage) FromString ¶
func (d *DataMessage) FromString(_ string) error
FromString parses message data from a string (currently a no-op).
func (*DataMessage) ToBase64 ¶
func (d *DataMessage) ToBase64() string
ToBase64 returns the message data as a base64 string (currently returns empty).
func (*DataMessage) ToString ¶
func (d *DataMessage) ToString() string
ToString returns the message data as a string.
type DeadLetterQueue ¶
type DeadLetterQueue struct {
ID int64 `json:"id"`
SubscriptionID int64 `json:"subscriptionID" db:"subscription_id"`
MessageID int64 `json:"messageID" db:"message_id"`
OriginalQueueID int64 `json:"originalQueueId" db:"original_queue_id"` // Reference to original queue item
// Failure information
AttemptCount int `json:"attemptCount" db:"attempt_count"` // Total attempts before DLQ
LastError string `json:"lastError" db:"last_error"` // Last error message
FailureReason string `json:"failureReason" db:"failure_reason"` // Reason for moving to DLQ
// Timing information
FirstAttemptAt time.Time `json:"firstAttemptAt" db:"first_attempt_at"` // When first delivery was attempted
LastAttemptAt time.Time `json:"lastAttemptAt" db:"last_attempt_at"` // When last attempt failed
MovedToDLQAt time.Time `json:"movedToDlqAt" db:"moved_to_dlq_at"` // When moved to DLQ
// Message data (denormalized for easy access)
MessageData string `json:"messageData" db:"message_data"` // Original message payload
CallbackURL string `json:"callbackURL" db:"callback_url"` // Target webhook URL
// Lifecycle
IsResolved bool `json:"isResolved" db:"is_resolved"` // Manual resolution flag
ResolvedAt *time.Time `json:"resolvedAt" db:"resolved_at"` // When manually resolved
ResolvedBy string `json:"resolvedBy" db:"resolved_by"` // Who resolved (user/system)
ResolutionNote string `json:"resolutionNote" db:"resolution_note"` // Resolution explanation
CreatedAt time.Time `json:"createdAt" db:"created_at"`
}
DeadLetterQueue represents a permanently failed message that exceeded the retry threshold. Messages are automatically moved here after exhausting all retry attempts (typically 5-7 attempts).
The DLQ serves as:
- Failure audit log with full diagnostic information
- Manual intervention queue for operations teams
- Source for failure analysis and monitoring
Business logic methods:
- Resolve: Mark item as manually resolved
- GetAge: Calculate time in DLQ
- IsOld: Check if item needs attention
Items remain in DLQ until manually resolved or deleted.
func NewDeadLetterQueue ¶
func NewDeadLetterQueue( subscriptionID, messageID, originalQueueID int64, attemptCount int, lastError, failureReason string, firstAttemptAt, lastAttemptAt time.Time, messageData, callbackURL string, ) DeadLetterQueue
NewDeadLetterQueue creates a new Dead Letter Queue entry from a failed queue item. This is called automatically by QueueWorker when a queue item exceeds the retry threshold.
Denormalizes message data and callback URL for easy access without joining tables.
func (*DeadLetterQueue) GetAge ¶
func (d *DeadLetterQueue) GetAge() time.Duration
GetAge returns how long the item has been in the Dead Letter Queue.
func (*DeadLetterQueue) IsOld ¶
func (d *DeadLetterQueue) IsOld(threshold time.Duration) bool
IsOld checks if the item has been in DLQ longer than the threshold duration. Used to identify stuck items that need urgent attention.
func (*DeadLetterQueue) Resolve ¶
func (d *DeadLetterQueue) Resolve(resolvedBy, note string)
Resolve marks the DLQ item as manually resolved by an operator. This is typically used after:
- Manual message replay
- Fixing the root cause and redelivering
- Determining the failure is acceptable and can be ignored
Parameters:
- resolvedBy: Username/system that resolved the item
- note: Explanation of the resolution action taken
func (DeadLetterQueue) TableName ¶
func (d DeadLetterQueue) TableName() string
TableName returns the database table name for DeadLetterQueue.
type DomainError ¶
type DomainError struct {
Code string // Error code for programmatic handling
Message string // Human-readable error message
}
DomainError represents a domain-level business rule violation. Used by Queue methods to return business logic errors.
func (DomainError) Error ¶
func (e DomainError) Error() string
type Message ¶
type Message struct {
ID int64 `json:"id"` // Unique message ID
TopicID int64 `json:"topicID"` // Topic this message belongs to
Identifier string `json:"identifier"` // Event identifier (e.g., "user-123")
Data string `json:"data"` // Message payload (JSON or string)
CreatedAt time.Time `json:"createdAt"` // Publication timestamp
}
Message represents a published message in the pub/sub system. Messages are immutable once created and contain the actual payload to be delivered.
Each published message creates queue items for all active subscriptions to its topic. Messages are retained for archival/audit purposes even after successful delivery.
func NewMessage ¶
NewMessage creates a new message for publication. Messages are immutable after creation.
Parameters:
- topicID: The topic to publish to
- identifier: Event identifier for filtering/routing
- data: Message payload (typically JSON)
type NotificationLog ¶
type NotificationLog struct {
ID int64 `json:"id"`
SubscriptionID int64 `json:"subscriptionID"`
MessageID int64 `json:"messageID"`
Identifier string `json:"identifier"`
TopicCode string `json:"topicCode"`
SubscriberType string `json:"subscriberType"` // "client", "service"
SubscriberID int64 `json:"subscriberID"`
DeliveryMethod string `json:"deliveryMethod"` // "webhook", "grpc", "pull"
Status string `json:"status"` // "pending", "sent", "failed", "skipped"
SkippedReason sql.NullString `json:"skippedReason"`
SentAt sql.NullTime `json:"sentAt"`
CreatedAt time.Time `json:"createdAt"`
}
NotificationLog - Log of all notification attempts for analytics and debugging.
func NewNotificationLog ¶
func NewNotificationLog( subscriptionID int64, messageID int64, identifier string, topicCode string, subscriberType string, subscriberID int64, deliveryMethod string, status string, ) NotificationLog
NewNotificationLog creates a new notification log entry.
func (NotificationLog) TableName ¶
func (t NotificationLog) TableName() string
TableName returns the database table name for NotificationLog.
type PublishResult ¶
type PublishResult struct {
Result bool // True if publish succeeded
}
PublishResult represents the result of a publish operation. Currently contains a simple success flag, can be extended with message IDs, etc.
type Publisher ¶
type Publisher struct {
ID int64 `json:"id"` // Unique publisher ID
Code string `json:"code" db:"publisher_code"` // Unique publisher code (e.g., "user-service")
Name string `json:"name"` // Human-readable publisher name
Description string `json:"description"` // Publisher description
IsActive bool `json:"isActive" db:"is_active"` // Only active publishers can publish
CreatedAt time.Time `json:"createdAt" db:"created_at"` // Publisher registration time
}
Publisher represents a message publisher in the pub/sub system. Publishers are registered services or applications that can publish messages to topics.
Each publisher is identified by a unique code and can be activated/deactivated. Inactive publishers cannot publish new messages.
func NewPublisher ¶
NewPublisher creates a new active publisher.
Parameters:
- code: Unique publisher identifier (e.g., "user-service", "order-processor")
- name: Human-readable name for display
- description: Purpose and details about this publisher
type Queue ¶
type Queue struct {
ID int64 `json:"id"`
SubscriptionID int64 `json:"subscriptionID"`
MessageID int64 `json:"messageID"`
Status QueueStatus `json:"status" db:"status"` // NEW: from 00019
AttemptCount int `json:"attemptCount" db:"attempt_count"` // NEW: from 00019
LastAttemptAt sql.NullTime `json:"lastAttemptAt" db:"last_attempt_at"` // NEW: from 00019
NextRetryAt sql.NullTime `json:"nextRetryAt" db:"next_retry_at"` // NEW: from 00019
LastError sql.NullString `json:"lastError" db:"last_error"` // NEW: from 00019
ExpiresAt time.Time `json:"expiresAt" db:"expires_at"` // NEW: from 00019
SequenceNumber int64 `json:"sequenceNumber" db:"sequence_number"` // NEW: from 00019
OperationTimestamp time.Time `json:"operationTimestamp" db:"operation_timestamp"` // NEW: from 00019
RetryAt sql.NullTime `json:"retryAt"` // LEGACY: keep for backward compatibility
IsComplete bool `json:"isComplete"` // LEGACY: deprecated, use Status
CompletedAt sql.NullTime `json:"completedAt"`
CreatedAt time.Time `json:"createdAt"`
}
Queue represents a message queued for delivery to a subscriber. It contains retry logic state, timing information, and error tracking.
Queue items follow this lifecycle:
- Created with status=PENDING
- Delivery attempted → either SENT (success) or FAILED (retry)
- FAILED items retry with exponential backoff
- After exceeding retry threshold → moved to Dead Letter Queue (DLQ)
Business logic methods:
- MarkSent/MarkFailed: Update status after delivery attempt
- CanAttemptDelivery: Check if delivery can be attempted
- ShouldRetry: Check if item is ready for retry
- ShouldMoveToDLQ: Check if exhausted retries
This model implements Domain-Driven Design with rich business logic.
func NewQueue ¶
NewQueue creates a new queue item for message delivery. Initial state: PENDING, AttemptCount=0, NextRetryAt=now (ready immediately). Default expiry: 24 hours from creation.
func (*Queue) CanAttemptDelivery ¶
CanAttemptDelivery validates whether delivery can be attempted based on business rules. Checks expiration, status, max attempts, and retry timing.
Returns error if delivery cannot be attempted:
- ErrQueueItemExpired: Item has expired
- ErrQueueItemAlreadySent: Already successfully delivered
- ErrMaxAttemptsExceeded: Exceeded retry limit
- ErrNotReadyForRetry: Too soon for retry
func (*Queue) GetTimeUntilExpiry ¶
GetTimeUntilExpiry returns the duration until the item expires. Negative duration means already expired.
func (*Queue) GetTimeUntilRetry ¶
GetTimeUntilRetry returns the duration until the next retry attempt. Returns 0 if ready for retry now, or error if no retry is scheduled.
func (*Queue) IsExpired ¶
IsExpired checks if the queue item has passed its expiration time. Expired items are cleaned up by the queue worker.
func (*Queue) MarkFailed ¶
MarkFailed marks the queue item as failed and schedules the next retry attempt. Increments attempt count, records error message, and calculates next retry time.
Parameters:
- err: The delivery error (stored in LastError)
- retryAfter: Duration to wait before next retry (exponential backoff)
func (*Queue) MarkSent ¶
func (t *Queue) MarkSent()
MarkSent marks the queue item as successfully delivered. Sets status to SENT and updates timing fields.
func (*Queue) RecordAttemptStart ¶
func (t *Queue) RecordAttemptStart()
RecordAttemptStart marks the beginning of a delivery attempt. Records timing only - attempt count is incremented by MarkFailed or MarkSent.
func (*Queue) SetComplete ¶
func (t *Queue) SetComplete()
SetComplete marks the queue item as complete (deprecated, use MarkSent instead).
func (*Queue) ShouldMoveToDLQ ¶
ShouldMoveToDLQ checks if the item should be moved to the Dead Letter Queue. Returns true when attempt count reaches the DLQ threshold and status is FAILED.
func (*Queue) ShouldRetry ¶
ShouldRetry checks if the item is ready for retry attempt. Returns true if status=FAILED, has valid NextRetryAt, and time has passed.
type QueueStatus ¶
type QueueStatus string
QueueStatus represents the lifecycle state of a queue item.
const ( // QueueStatusPending indicates the message is awaiting first delivery attempt. QueueStatusPending QueueStatus = "pending" // QueueStatusSent indicates successful message delivery. QueueStatusSent QueueStatus = "sent" // QueueStatusFailed indicates delivery failed and item is awaiting retry. QueueStatusFailed QueueStatus = "failed" )
type QueueWithDetails ¶
type QueueWithDetails struct {
// Core fields
QueueID int64 `json:"queueId"`
MessageID int64 `json:"messageID"`
SubscriptionID int64 `json:"subscriptionID"`
Status string `json:"status"` // "pending", "sent", "failed"
// Retry tracking
AttemptCount int `json:"attemptCount"`
LastAttemptAt *time.Time `json:"lastAttemptAt"`
NextRetryAt *time.Time `json:"nextRetryAt"`
LastError string `json:"lastError"`
// Ordering and TTL
SequenceNumber int64 `json:"sequenceNumber"`
OperationTime time.Time `json:"operationTime"`
ExpiresAt *time.Time `json:"expiresAt"`
CreatedAt time.Time `json:"createdAt"`
// Message details
Identifier string `json:"identifier"`
Data []byte `json:"data"`
// Subscription details
CallbackURL string `json:"callbackURL"`
SubscriberID int64 `json:"subscriberID"`
}
QueueWithDetails - Extended queue model with new TTL-based retry fields (Migration 00018). Used by NotificationService for guaranteed delivery with exponential backoff.
type Subscriber ¶
type Subscriber struct {
ID int64 `json:"id"` // Unique subscriber ID
ClientID int64 `json:"clientID" db:"client_id"` // Associated client/tenant ID
Name string `json:"name"` // Subscriber name
WebhookURL string `json:"webhookURL" db:"webhook_url"` // HTTP endpoint for message delivery
IsActive bool `json:"isActive" db:"is_active"` // Only active subscribers receive messages
CreatedAt time.Time `json:"createdAt" db:"created_at"` // Subscriber registration time
}
Subscriber represents a message consumer in the pub/sub system. Subscribers receive messages via webhooks when topics they're subscribed to receive new messages.
Each subscriber:
- Has a webhook URL for message delivery
- Is associated with a client (tenant/organization)
- Can have multiple subscriptions to different topics
- Can be activated/deactivated
func NewSubscriber ¶
func NewSubscriber(clientID int64, name, webhookURL string) Subscriber
NewSubscriber creates a new active subscriber.
Parameters:
- clientID: The client/tenant this subscriber belongs to
- name: Human-readable subscriber name
- webhookURL: HTTP endpoint to receive message deliveries
func (Subscriber) TableName ¶
func (t Subscriber) TableName() string
TableName returns the database table name for Subscriber.
type Subscription ¶
type Subscription struct {
ID int64 `json:"id"` // Unique subscription ID
SubscriberID int64 `json:"subscriberID"` // Subscriber who owns this subscription
TopicID int64 `json:"topicID"` // Topic being subscribed to
Identifier string `json:"identifier"` // Event identifier filter
IsActive bool `json:"isActive"` // Active subscriptions receive messages
CreatedAt time.Time `json:"createdAt"` // Subscription creation time
DeletedAt sql.NullTime `json:"deletedAt"` // Soft delete timestamp
}
Subscription represents a subscriber's subscription to a topic. Subscriptions connect subscribers to topics, enabling message delivery routing.
Each subscription:
- Links a subscriber to a topic
- Filters messages by identifier (e.g., "user-123")
- Can be activated/deactivated (soft delete)
- Creates queue items when matching messages are published
Lifecycle: Active subscriptions receive new messages, inactive ones don't.
func NewSubscription ¶
func NewSubscription(subscriberID, topicID int64, identifier, _ string) Subscription
NewSubscription creates a new active subscription. The callbackURL parameter is retained for compatibility but stored on the Subscriber.
Parameters:
- subscriberID: The subscriber creating this subscription
- topicID: The topic to subscribe to
- identifier: Event identifier for filtering (e.g., "user-123", "order-*")
- callbackURL: Webhook URL (typically stored on Subscriber, parameter kept for compatibility)
func (*Subscription) Deactivate ¶
func (m *Subscription) Deactivate()
Deactivate performs a soft delete on the subscription. Deactivated subscriptions stop receiving new messages but are retained for audit purposes.
func (Subscription) TableName ¶
func (m Subscription) TableName() string
TableName returns the database table name for Subscription.
type SubscriptionFull ¶
type SubscriptionFull struct {
Subscription // Embedded base subscription
Messages int // Count of messages delivered
CallbackURL string // Denormalized webhook URL from subscriber
}
SubscriptionFull is an extended subscription view with denormalized fields. Used by queries that need subscription details along with statistics and webhook URLs.
type SubscriptionV2 ¶
type SubscriptionV2 struct {
ID int64 `json:"id"`
SubscriberID int64 `json:"subscriberID"`
TopicID int64 `json:"topicID"`
Identifier string `json:"identifier"`
CallbackURL string `json:"callbackURL"`
IsActive bool `json:"isActive"`
TrackID string `json:"trackId"`
CreatedAt time.Time `json:"createdAt"`
LastNotificationAt *time.Time `json:"lastNotificationAt"` // New field (Migration 00018)
}
SubscriptionV2 - Extended with last_notification_at for duplicate prevention.
type Topic ¶
type Topic struct {
ID int64 `json:"id"` // Unique topic ID
Code string `json:"code" db:"topic_code"` // Unique topic code (e.g., "user.signup")
Name string `json:"name"` // Human-readable topic name
Description string `json:"description"` // Topic purpose and details
IsActive bool `json:"isActive" db:"is_active"` // Only active topics accept new messages
CreatedAt time.Time `json:"createdAt" db:"created_at"` // Topic creation time
}
Topic represents a message category/channel in the pub/sub system. Topics define the routing mechanism for messages to subscribers.
When a message is published to a topic, it is delivered to all active subscriptions matching that topic and identifier.
Topics can be hierarchical using dot notation (e.g., "user.created", "order.payment.completed").