outbox

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Overview

Package outbox provides domain entities for the notification outbox system.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrOutboxNotFound is returned when an outbox entry is not found.
	ErrOutboxNotFound = fmt.Errorf("%w: outbox entry not found", shared.ErrNotFound)

	// ErrOutboxAlreadyProcessed is returned when trying to process an already processed outbox entry.
	ErrOutboxAlreadyProcessed = fmt.Errorf("%w: outbox entry already processed", shared.ErrConflict)

	// ErrOutboxLocked is returned when an outbox entry is locked by another worker.
	ErrOutboxLocked = fmt.Errorf("%w: outbox entry is locked by another worker", shared.ErrConflict)

	// ErrInvalidOutboxStatus is returned when an invalid status transition is attempted.
	ErrInvalidOutboxStatus = fmt.Errorf("%w: invalid outbox status transition", shared.ErrValidation)

	// ErrEventNotFound is returned when a notification event is not found.
	ErrEventNotFound = fmt.Errorf("%w: notification event not found", shared.ErrNotFound)
)

Domain errors for notification module.

Functions

This section is empty.

Types

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event represents an archived notification event after processing. This is the permanent audit trail of all notifications.

func NewEvent

func NewEvent(params EventParams) *Event

NewEvent creates a new event.

func NewEventFromOutbox

func NewEventFromOutbox(outbox *Outbox, results ProcessingResults) *Event

NewEventFromOutbox creates an event from a processed outbox entry.

func ReconstituteEvent

func ReconstituteEvent(
	id ID,
	tenantID shared.ID,
	eventType string,
	aggregateType string,
	aggregateID *uuid.UUID,
	title string,
	body string,
	severity Severity,
	url string,
	metadata map[string]any,
	status EventStatus,
	integrationsTotal int,
	integrationsMatched int,
	integrationsSucceeded int,
	integrationsFailed int,
	sendResults []SendResult,
	lastError string,
	retryCount int,
	createdAt time.Time,
	processedAt time.Time,
) *Event

ReconstituteEvent recreates an event from persistence.

func (*Event) AggregateID

func (e *Event) AggregateID() *uuid.UUID

func (*Event) AggregateType

func (e *Event) AggregateType() string

func (*Event) Body

func (e *Event) Body() string

func (*Event) CreatedAt

func (e *Event) CreatedAt() time.Time

func (*Event) EventType

func (e *Event) EventType() string

func (*Event) ID

func (e *Event) ID() ID

func (*Event) IntegrationsFailed

func (e *Event) IntegrationsFailed() int

func (*Event) IntegrationsMatched

func (e *Event) IntegrationsMatched() int

func (*Event) IntegrationsSucceeded

func (e *Event) IntegrationsSucceeded() int

func (*Event) IntegrationsTotal

func (e *Event) IntegrationsTotal() int

func (*Event) LastError

func (e *Event) LastError() string

func (*Event) Metadata

func (e *Event) Metadata() map[string]any

func (*Event) ProcessedAt

func (e *Event) ProcessedAt() time.Time

func (*Event) RetryCount

func (e *Event) RetryCount() int

func (*Event) SendResults

func (e *Event) SendResults() []SendResult

func (*Event) Severity

func (e *Event) Severity() Severity

func (*Event) Status

func (e *Event) Status() EventStatus

func (*Event) TenantID

func (e *Event) TenantID() shared.ID

func (*Event) Title

func (e *Event) Title() string

func (*Event) URL

func (e *Event) URL() string

type EventFilter

type EventFilter struct {
	TenantID      *shared.ID
	Status        *EventStatus
	EventType     string
	AggregateType string
	AggregateID   *uuid.UUID
	Limit         int
	Offset        int
}

EventFilter contains filter options for listing events.

type EventParams

type EventParams struct {
	ID                    ID
	TenantID              shared.ID
	EventType             string
	AggregateType         string
	AggregateID           *uuid.UUID
	Title                 string
	Body                  string
	Severity              Severity
	URL                   string
	Metadata              map[string]any
	Status                EventStatus
	IntegrationsTotal     int
	IntegrationsMatched   int
	IntegrationsSucceeded int
	IntegrationsFailed    int
	SendResults           []SendResult
	LastError             string
	RetryCount            int
	CreatedAt             time.Time
	ProcessedAt           time.Time
}

EventParams contains parameters for creating a new event.

type EventRepository

type EventRepository interface {

	// Create inserts a new event.
	Create(ctx context.Context, event *Event) error

	// GetByID retrieves an event by ID.
	GetByID(ctx context.Context, id ID) (*Event, error)

	// Delete removes an event.
	Delete(ctx context.Context, id ID) error

	// ListByTenant retrieves events for a tenant with pagination.
	ListByTenant(ctx context.Context, tenantID shared.ID, filter EventFilter) ([]*Event, int64, error)

	// GetStats returns aggregated statistics for events.
	GetStats(ctx context.Context, tenantID *shared.ID) (*EventStats, error)

	// ListByIntegration retrieves events that were sent to a specific integration.
	// Uses JSONB query on send_results array.
	ListByIntegration(ctx context.Context, integrationID string, limit, offset int) ([]*Event, int64, error)

	// DeleteOldEvents removes events older than the specified days.
	// If retentionDays <= 0, no deletion is performed (unlimited retention).
	DeleteOldEvents(ctx context.Context, retentionDays int) (int64, error)
}

EventRepository defines the interface for notification event persistence.

type EventStats

type EventStats struct {
	Completed int64
	Failed    int64
	Skipped   int64
	Total     int64
}

EventStats represents aggregated statistics for events.

type EventStatus

type EventStatus string

EventStatus represents the final processing status of a notification event.

const (
	// EventStatusCompleted - At least one integration succeeded.
	EventStatusCompleted EventStatus = "completed"

	// EventStatusFailed - All integrations failed after retries.
	EventStatusFailed EventStatus = "failed"

	// EventStatusSkipped - No integrations matched the event filters.
	EventStatusSkipped EventStatus = "skipped"
)

func (EventStatus) String

func (s EventStatus) String() string

String returns the string representation of the status.

type ID

type ID = shared.ID

ID represents a notification outbox ID.

func NewID

func NewID() ID

NewID generates a new notification ID.

func ParseID

func ParseID(s string) (ID, error)

ParseID parses a string into a notification ID.

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox represents a notification task in the outbox queue. It follows the Transactional Outbox Pattern for reliable message delivery.

func NewOutbox

func NewOutbox(params OutboxParams) *Outbox

NewOutbox creates a new outbox entry.

func Reconstitute

func Reconstitute(
	id ID,
	tenantID shared.ID,
	eventType string,
	aggregateType string,
	aggregateID *uuid.UUID,
	title string,
	body string,
	severity Severity,
	url string,
	metadata map[string]any,
	status OutboxStatus,
	retryCount int,
	maxRetries int,
	lastError string,
	scheduledAt time.Time,
	lockedAt *time.Time,
	lockedBy string,
	createdAt time.Time,
	updatedAt time.Time,
	processedAt *time.Time,
) *Outbox

Reconstitute recreates an outbox entry from persistence.

func (*Outbox) AggregateID

func (o *Outbox) AggregateID() *uuid.UUID

func (*Outbox) AggregateType

func (o *Outbox) AggregateType() string

func (*Outbox) Body

func (o *Outbox) Body() string

func (*Outbox) CanRetry

func (o *Outbox) CanRetry() bool

CanRetry returns true if the task can be retried.

func (*Outbox) CreatedAt

func (o *Outbox) CreatedAt() time.Time

func (*Outbox) EventType

func (o *Outbox) EventType() string

func (*Outbox) GetMetadata

func (o *Outbox) GetMetadata(key string) (any, bool)

GetMetadata gets a metadata value by key.

func (*Outbox) ID

func (o *Outbox) ID() ID

func (*Outbox) LastError

func (o *Outbox) LastError() string

func (*Outbox) Lock

func (o *Outbox) Lock(workerID string) error

Lock marks the outbox entry as being processed by a worker.

func (*Outbox) LockedAt

func (o *Outbox) LockedAt() *time.Time

func (*Outbox) LockedBy

func (o *Outbox) LockedBy() string

func (*Outbox) MarkCompleted

func (o *Outbox) MarkCompleted()

MarkCompleted marks the outbox entry as successfully processed.

func (*Outbox) MarkDead

func (o *Outbox) MarkDead(reason string)

MarkDead marks the outbox entry as dead (requires manual intervention).

func (*Outbox) MarkFailed

func (o *Outbox) MarkFailed(errorMessage string)

MarkFailed marks the outbox entry as failed and schedules a retry if possible.

func (*Outbox) MaxRetries

func (o *Outbox) MaxRetries() int

func (*Outbox) Metadata

func (o *Outbox) Metadata() map[string]any

func (*Outbox) ProcessedAt

func (o *Outbox) ProcessedAt() *time.Time

func (*Outbox) ResetForRetry

func (o *Outbox) ResetForRetry()

ResetForRetry resets a failed/dead entry to pending for manual retry. This resets the retry count and schedules immediate processing.

func (*Outbox) RetryCount

func (o *Outbox) RetryCount() int

func (*Outbox) ScheduledAt

func (o *Outbox) ScheduledAt() time.Time

func (*Outbox) SetMetadata

func (o *Outbox) SetMetadata(key string, value any)

SetMetadata sets a metadata key-value pair.

func (*Outbox) Severity

func (o *Outbox) Severity() Severity

func (*Outbox) Status

func (o *Outbox) Status() OutboxStatus

func (*Outbox) TenantID

func (o *Outbox) TenantID() shared.ID

func (*Outbox) Title

func (o *Outbox) Title() string

func (*Outbox) URL

func (o *Outbox) URL() string

func (*Outbox) Unlock

func (o *Outbox) Unlock()

Unlock releases the lock on the outbox entry (for cleanup of stuck tasks).

func (*Outbox) UpdatedAt

func (o *Outbox) UpdatedAt() time.Time

type OutboxFilter

type OutboxFilter struct {
	TenantID      *shared.ID
	Status        *OutboxStatus
	EventType     string
	AggregateType string
	Limit         int
	Offset        int
}

OutboxFilter contains filter options for listing outbox entries.

type OutboxParams

type OutboxParams struct {
	TenantID      shared.ID
	EventType     string
	AggregateType string
	AggregateID   *uuid.UUID
	Title         string
	Body          string
	Severity      Severity
	URL           string
	Metadata      map[string]any
	MaxRetries    int        // Optional, defaults to 3
	ScheduledAt   *time.Time // Optional, defaults to now
}

OutboxParams contains parameters for creating a new outbox entry.

type OutboxRepository

type OutboxRepository interface {

	// Create inserts a new outbox entry.
	Create(ctx context.Context, outbox *Outbox) error

	// CreateInTx inserts a new outbox entry within an existing transaction.
	// This is the key method for the transactional outbox pattern.
	// The tx parameter should be a *sql.Tx from the same database connection.
	CreateInTx(ctx context.Context, tx *sql.Tx, outbox *Outbox) error

	// GetByID retrieves an outbox entry by ID.
	GetByID(ctx context.Context, id ID) (*Outbox, error)

	// Update updates an outbox entry.
	Update(ctx context.Context, outbox *Outbox) error

	// Delete removes an outbox entry.
	Delete(ctx context.Context, id ID) error

	// FetchPendingBatch retrieves and locks a batch of pending outbox entries.
	// Uses FOR UPDATE SKIP LOCKED for concurrent worker safety.
	// Returns entries where scheduled_at <= now and status = 'pending'.
	FetchPendingBatch(ctx context.Context, workerID string, batchSize int) ([]*Outbox, error)

	// UnlockStale releases locks on entries that have been processing for too long.
	// This handles worker crashes or timeouts.
	UnlockStale(ctx context.Context, olderThanMinutes int) (int64, error)

	// DeleteOldCompleted removes completed entries older than the specified days.
	DeleteOldCompleted(ctx context.Context, olderThanDays int) (int64, error)

	// DeleteOldFailed removes failed/dead entries older than the specified days.
	DeleteOldFailed(ctx context.Context, olderThanDays int) (int64, error)

	// List retrieves outbox entries with filtering and pagination.
	List(ctx context.Context, filter OutboxFilter, page pagination.Pagination) (pagination.Result[*Outbox], error)

	// GetStats returns aggregated statistics for outbox entries.
	GetStats(ctx context.Context, tenantID *shared.ID) (*OutboxStats, error)

	// ListByTenant retrieves outbox entries for a tenant with pagination.
	ListByTenant(ctx context.Context, tenantID shared.ID, filter OutboxFilter) ([]*Outbox, int64, error)

	// CountByStatus returns counts grouped by status for a tenant.
	CountByStatus(ctx context.Context, tenantID shared.ID) (map[OutboxStatus]int64, error)

	// GetByAggregateID retrieves outbox entries for a specific aggregate.
	GetByAggregateID(ctx context.Context, aggregateType string, aggregateID string) ([]*Outbox, error)
}

OutboxRepository defines the interface for outbox persistence.

type OutboxStats

type OutboxStats struct {
	Pending    int64
	Processing int64
	Completed  int64
	Failed     int64
	Dead       int64
	Total      int64
}

OutboxStats contains aggregated statistics for outbox entries.

type OutboxStatus

type OutboxStatus string

OutboxStatus represents the processing status of an outbox entry.

const (
	// OutboxStatusPending - Task is waiting to be processed.
	OutboxStatusPending OutboxStatus = "pending"

	// OutboxStatusProcessing - Task is currently being processed by a worker.
	OutboxStatusProcessing OutboxStatus = "processing"

	// OutboxStatusCompleted - Task was processed successfully.
	OutboxStatusCompleted OutboxStatus = "completed"

	// OutboxStatusFailed - Task failed but may be retried.
	OutboxStatusFailed OutboxStatus = "failed"

	// OutboxStatusDead - Task failed permanently, requires manual intervention.
	OutboxStatusDead OutboxStatus = "dead"
)

func (OutboxStatus) IsTerminal

func (s OutboxStatus) IsTerminal() bool

IsTerminal returns true if the status is a terminal state (no more processing).

func (OutboxStatus) String

func (s OutboxStatus) String() string

String returns the string representation of the status.

type ProcessingResults

type ProcessingResults struct {
	IntegrationsTotal     int
	IntegrationsMatched   int
	IntegrationsSucceeded int
	IntegrationsFailed    int
	SendResults           []SendResult
}

ProcessingResults contains the results of processing an outbox entry.

type SendResult

type SendResult struct {
	IntegrationID   string    `json:"integration_id"`
	IntegrationName string    `json:"name"`
	Provider        string    `json:"provider"`
	Status          string    `json:"status"` // success, failed
	MessageID       string    `json:"message_id,omitempty"`
	Error           string    `json:"error,omitempty"`
	SentAt          time.Time `json:"sent_at"`
}

SendResult represents the result of sending to a single integration.

type Severity

type Severity string

Severity represents the severity level of a notification.

const (
	SeverityCritical Severity = "critical"
	SeverityHigh     Severity = "high"
	SeverityMedium   Severity = "medium"
	SeverityLow      Severity = "low"
	SeverityInfo     Severity = "info"
	SeverityNone     Severity = "none"
)

func (Severity) String

func (s Severity) String() string

String returns the string representation of the severity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL