Documentation
¶
Overview ¶
Package outbox provides domain entities for the notification outbox system.
Index ¶
- Variables
- type Event
- func (e *Event) AggregateID() *uuid.UUID
- func (e *Event) AggregateType() string
- func (e *Event) Body() string
- func (e *Event) CreatedAt() time.Time
- func (e *Event) EventType() string
- func (e *Event) ID() ID
- func (e *Event) IntegrationsFailed() int
- func (e *Event) IntegrationsMatched() int
- func (e *Event) IntegrationsSucceeded() int
- func (e *Event) IntegrationsTotal() int
- func (e *Event) LastError() string
- func (e *Event) Metadata() map[string]any
- func (e *Event) ProcessedAt() time.Time
- func (e *Event) RetryCount() int
- func (e *Event) SendResults() []SendResult
- func (e *Event) Severity() Severity
- func (e *Event) Status() EventStatus
- func (e *Event) TenantID() shared.ID
- func (e *Event) Title() string
- func (e *Event) URL() string
- type EventFilter
- type EventParams
- type EventRepository
- type EventStats
- type EventStatus
- type ID
- type Outbox
- func (o *Outbox) AggregateID() *uuid.UUID
- func (o *Outbox) AggregateType() string
- func (o *Outbox) Body() string
- func (o *Outbox) CanRetry() bool
- func (o *Outbox) CreatedAt() time.Time
- func (o *Outbox) EventType() string
- func (o *Outbox) GetMetadata(key string) (any, bool)
- func (o *Outbox) ID() ID
- func (o *Outbox) LastError() string
- func (o *Outbox) Lock(workerID string) error
- func (o *Outbox) LockedAt() *time.Time
- func (o *Outbox) LockedBy() string
- func (o *Outbox) MarkCompleted()
- func (o *Outbox) MarkDead(reason string)
- func (o *Outbox) MarkFailed(errorMessage string)
- func (o *Outbox) MaxRetries() int
- func (o *Outbox) Metadata() map[string]any
- func (o *Outbox) ProcessedAt() *time.Time
- func (o *Outbox) ResetForRetry()
- func (o *Outbox) RetryCount() int
- func (o *Outbox) ScheduledAt() time.Time
- func (o *Outbox) SetMetadata(key string, value any)
- func (o *Outbox) Severity() Severity
- func (o *Outbox) Status() OutboxStatus
- func (o *Outbox) TenantID() shared.ID
- func (o *Outbox) Title() string
- func (o *Outbox) URL() string
- func (o *Outbox) Unlock()
- func (o *Outbox) UpdatedAt() time.Time
- type OutboxFilter
- type OutboxParams
- type OutboxRepository
- type OutboxStats
- type OutboxStatus
- type ProcessingResults
- type SendResult
- type Severity
Constants ¶
This section is empty.
Variables ¶
var ( // ErrOutboxNotFound is returned when an outbox entry is not found. ErrOutboxNotFound = fmt.Errorf("%w: outbox entry not found", shared.ErrNotFound) // ErrOutboxAlreadyProcessed is returned when trying to process an already processed outbox entry. ErrOutboxAlreadyProcessed = fmt.Errorf("%w: outbox entry already processed", shared.ErrConflict) // ErrOutboxLocked is returned when an outbox entry is locked by another worker. ErrOutboxLocked = fmt.Errorf("%w: outbox entry is locked by another worker", shared.ErrConflict) // ErrInvalidOutboxStatus is returned when an invalid status transition is attempted. ErrInvalidOutboxStatus = fmt.Errorf("%w: invalid outbox status transition", shared.ErrValidation) // ErrEventNotFound is returned when a notification event is not found. ErrEventNotFound = fmt.Errorf("%w: notification event not found", shared.ErrNotFound) )
Domain errors for notification module.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event represents an archived notification event after processing. This is the permanent audit trail of all notifications.
func NewEventFromOutbox ¶
func NewEventFromOutbox(outbox *Outbox, results ProcessingResults) *Event
NewEventFromOutbox creates an event from a processed outbox entry.
func ReconstituteEvent ¶
func ReconstituteEvent( id ID, tenantID shared.ID, eventType string, aggregateType string, aggregateID *uuid.UUID, title string, body string, severity Severity, url string, metadata map[string]any, status EventStatus, integrationsTotal int, integrationsMatched int, integrationsSucceeded int, integrationsFailed int, sendResults []SendResult, lastError string, retryCount int, createdAt time.Time, processedAt time.Time, ) *Event
ReconstituteEvent recreates an event from persistence.
func (*Event) AggregateID ¶
func (*Event) AggregateType ¶
func (*Event) IntegrationsFailed ¶
func (*Event) IntegrationsMatched ¶
func (*Event) IntegrationsSucceeded ¶
func (*Event) IntegrationsTotal ¶
func (*Event) ProcessedAt ¶
func (*Event) RetryCount ¶
func (*Event) SendResults ¶
func (e *Event) SendResults() []SendResult
func (*Event) Status ¶
func (e *Event) Status() EventStatus
type EventFilter ¶
type EventFilter struct {
TenantID *shared.ID
Status *EventStatus
EventType string
AggregateType string
AggregateID *uuid.UUID
Limit int
Offset int
}
EventFilter contains filter options for listing events.
type EventParams ¶
type EventParams struct {
ID ID
TenantID shared.ID
EventType string
AggregateType string
AggregateID *uuid.UUID
Title string
Body string
Severity Severity
URL string
Metadata map[string]any
Status EventStatus
IntegrationsTotal int
IntegrationsMatched int
IntegrationsSucceeded int
IntegrationsFailed int
SendResults []SendResult
LastError string
RetryCount int
CreatedAt time.Time
ProcessedAt time.Time
}
EventParams contains parameters for creating a new event.
type EventRepository ¶
type EventRepository interface {
// Create inserts a new event.
Create(ctx context.Context, event *Event) error
// GetByID retrieves an event by ID.
GetByID(ctx context.Context, id ID) (*Event, error)
// Delete removes an event.
Delete(ctx context.Context, id ID) error
// ListByTenant retrieves events for a tenant with pagination.
ListByTenant(ctx context.Context, tenantID shared.ID, filter EventFilter) ([]*Event, int64, error)
// GetStats returns aggregated statistics for events.
GetStats(ctx context.Context, tenantID *shared.ID) (*EventStats, error)
// ListByIntegration retrieves events that were sent to a specific integration.
// Uses JSONB query on send_results array.
ListByIntegration(ctx context.Context, integrationID string, limit, offset int) ([]*Event, int64, error)
// DeleteOldEvents removes events older than the specified days.
// If retentionDays <= 0, no deletion is performed (unlimited retention).
DeleteOldEvents(ctx context.Context, retentionDays int) (int64, error)
}
EventRepository defines the interface for notification event persistence.
type EventStats ¶
EventStats represents aggregated statistics for events.
type EventStatus ¶
type EventStatus string
EventStatus represents the final processing status of a notification event.
const ( // EventStatusCompleted - At least one integration succeeded. EventStatusCompleted EventStatus = "completed" // EventStatusFailed - All integrations failed after retries. EventStatusFailed EventStatus = "failed" // EventStatusSkipped - No integrations matched the event filters. EventStatusSkipped EventStatus = "skipped" )
func (EventStatus) String ¶
func (s EventStatus) String() string
String returns the string representation of the status.
type Outbox ¶
type Outbox struct {
// contains filtered or unexported fields
}
Outbox represents a notification task in the outbox queue. It follows the Transactional Outbox Pattern for reliable message delivery.
func Reconstitute ¶
func Reconstitute( id ID, tenantID shared.ID, eventType string, aggregateType string, aggregateID *uuid.UUID, title string, body string, severity Severity, url string, metadata map[string]any, status OutboxStatus, retryCount int, maxRetries int, lastError string, scheduledAt time.Time, lockedAt *time.Time, lockedBy string, createdAt time.Time, updatedAt time.Time, processedAt *time.Time, ) *Outbox
Reconstitute recreates an outbox entry from persistence.
func (*Outbox) AggregateID ¶
func (*Outbox) AggregateType ¶
func (*Outbox) GetMetadata ¶
GetMetadata gets a metadata value by key.
func (*Outbox) MarkCompleted ¶
func (o *Outbox) MarkCompleted()
MarkCompleted marks the outbox entry as successfully processed.
func (*Outbox) MarkFailed ¶
MarkFailed marks the outbox entry as failed and schedules a retry if possible.
func (*Outbox) MaxRetries ¶
func (*Outbox) ProcessedAt ¶
func (*Outbox) ResetForRetry ¶
func (o *Outbox) ResetForRetry()
ResetForRetry resets a failed/dead entry to pending for manual retry. This resets the retry count and schedules immediate processing.
func (*Outbox) RetryCount ¶
func (*Outbox) ScheduledAt ¶
func (*Outbox) SetMetadata ¶
SetMetadata sets a metadata key-value pair.
func (*Outbox) Status ¶
func (o *Outbox) Status() OutboxStatus
type OutboxFilter ¶
type OutboxFilter struct {
TenantID *shared.ID
Status *OutboxStatus
EventType string
AggregateType string
Limit int
Offset int
}
OutboxFilter contains filter options for listing outbox entries.
type OutboxParams ¶
type OutboxParams struct {
TenantID shared.ID
EventType string
AggregateType string
AggregateID *uuid.UUID
Title string
Body string
Severity Severity
URL string
Metadata map[string]any
MaxRetries int // Optional, defaults to 3
ScheduledAt *time.Time // Optional, defaults to now
}
OutboxParams contains parameters for creating a new outbox entry.
type OutboxRepository ¶
type OutboxRepository interface {
// Create inserts a new outbox entry.
Create(ctx context.Context, outbox *Outbox) error
// CreateInTx inserts a new outbox entry within an existing transaction.
// This is the key method for the transactional outbox pattern.
// The tx parameter should be a *sql.Tx from the same database connection.
CreateInTx(ctx context.Context, tx *sql.Tx, outbox *Outbox) error
// GetByID retrieves an outbox entry by ID.
GetByID(ctx context.Context, id ID) (*Outbox, error)
// Update updates an outbox entry.
Update(ctx context.Context, outbox *Outbox) error
// Delete removes an outbox entry.
Delete(ctx context.Context, id ID) error
// FetchPendingBatch retrieves and locks a batch of pending outbox entries.
// Uses FOR UPDATE SKIP LOCKED for concurrent worker safety.
// Returns entries where scheduled_at <= now and status = 'pending'.
FetchPendingBatch(ctx context.Context, workerID string, batchSize int) ([]*Outbox, error)
// UnlockStale releases locks on entries that have been processing for too long.
// This handles worker crashes or timeouts.
UnlockStale(ctx context.Context, olderThanMinutes int) (int64, error)
// DeleteOldCompleted removes completed entries older than the specified days.
DeleteOldCompleted(ctx context.Context, olderThanDays int) (int64, error)
// DeleteOldFailed removes failed/dead entries older than the specified days.
DeleteOldFailed(ctx context.Context, olderThanDays int) (int64, error)
// List retrieves outbox entries with filtering and pagination.
List(ctx context.Context, filter OutboxFilter, page pagination.Pagination) (pagination.Result[*Outbox], error)
// GetStats returns aggregated statistics for outbox entries.
GetStats(ctx context.Context, tenantID *shared.ID) (*OutboxStats, error)
// ListByTenant retrieves outbox entries for a tenant with pagination.
ListByTenant(ctx context.Context, tenantID shared.ID, filter OutboxFilter) ([]*Outbox, int64, error)
// CountByStatus returns counts grouped by status for a tenant.
CountByStatus(ctx context.Context, tenantID shared.ID) (map[OutboxStatus]int64, error)
// GetByAggregateID retrieves outbox entries for a specific aggregate.
GetByAggregateID(ctx context.Context, aggregateType string, aggregateID string) ([]*Outbox, error)
}
OutboxRepository defines the interface for outbox persistence.
type OutboxStats ¶
type OutboxStats struct {
Pending int64
Processing int64
Completed int64
Failed int64
Dead int64
Total int64
}
OutboxStats contains aggregated statistics for outbox entries.
type OutboxStatus ¶
type OutboxStatus string
OutboxStatus represents the processing status of an outbox entry.
const ( // OutboxStatusPending - Task is waiting to be processed. OutboxStatusPending OutboxStatus = "pending" // OutboxStatusProcessing - Task is currently being processed by a worker. OutboxStatusProcessing OutboxStatus = "processing" // OutboxStatusCompleted - Task was processed successfully. OutboxStatusCompleted OutboxStatus = "completed" // OutboxStatusFailed - Task failed but may be retried. OutboxStatusFailed OutboxStatus = "failed" // OutboxStatusDead - Task failed permanently, requires manual intervention. OutboxStatusDead OutboxStatus = "dead" )
func (OutboxStatus) IsTerminal ¶
func (s OutboxStatus) IsTerminal() bool
IsTerminal returns true if the status is a terminal state (no more processing).
func (OutboxStatus) String ¶
func (s OutboxStatus) String() string
String returns the string representation of the status.
type ProcessingResults ¶
type ProcessingResults struct {
IntegrationsTotal int
IntegrationsMatched int
IntegrationsSucceeded int
IntegrationsFailed int
SendResults []SendResult
}
ProcessingResults contains the results of processing an outbox entry.
type SendResult ¶
type SendResult struct {
IntegrationID string `json:"integration_id"`
IntegrationName string `json:"name"`
Provider string `json:"provider"`
Status string `json:"status"` // success, failed
MessageID string `json:"message_id,omitempty"`
Error string `json:"error,omitempty"`
SentAt time.Time `json:"sent_at"`
}
SendResult represents the result of sending to a single integration.