outbox

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EnqueueParams

type EnqueueParams struct {
	TenantID      shared.ID
	EventType     string     // e.g., "new_finding", "scan_completed"
	AggregateType string     // e.g., "finding", "scan"
	AggregateID   *uuid.UUID // ID of the source entity
	Title         string
	Body          string
	Severity      string // critical, high, medium, low, info
	URL           string
	Metadata      map[string]any
}

EnqueueParams contains parameters for enqueuing a notification.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler runs periodic notification processing tasks.

func NewScheduler

func NewScheduler(service *Service, config SchedulerConfig, log *logger.Logger) *Scheduler

NewScheduler creates a new notification scheduler.

func (*Scheduler) Start

func (s *Scheduler) Start()

Start starts the notification scheduler.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the notification scheduler gracefully.

type SchedulerConfig

type SchedulerConfig struct {
	// ProcessInterval is how often to process outbox entries (default: 5 seconds).
	ProcessInterval time.Duration
	// CleanupInterval is how often to cleanup old entries (default: 24 hours).
	CleanupInterval time.Duration
	// UnlockInterval is how often to unlock stale entries (default: 1 minute).
	UnlockInterval time.Duration
	// BatchSize is the number of entries to process per batch (default: 50).
	BatchSize int
	// CompletedRetentionDays is how long to keep completed outbox entries (default: 7).
	// Note: With the new architecture, completed entries are deleted immediately after archiving.
	// This is kept for backward compatibility and for entries that failed to archive.
	CompletedRetentionDays int
	// FailedRetentionDays is how long to keep failed outbox entries (default: 30).
	FailedRetentionDays int
	// EventRetentionDays is how long to keep archived notification events (default: 90).
	// Set to 0 for unlimited retention.
	EventRetentionDays int
	// StaleMinutes is how long before a locked entry is considered stale (default: 10).
	StaleMinutes int
}

SchedulerConfig holds configuration for the notification scheduler.

func DefaultSchedulerConfig

func DefaultSchedulerConfig() SchedulerConfig

DefaultSchedulerConfig returns the default configuration.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service handles notification outbox operations.

func NewService

func NewService(
	outboxRepo outboxdom.OutboxRepository,
	eventRepo outboxdom.EventRepository,
	notificationRepo integration.NotificationExtensionRepository,
	credentialDecrypt func(string) (string, error),
	log *slog.Logger,
) *Service

NewService creates a new Service.

func (*Service) CleanupOldEntries

func (s *Service) CleanupOldEntries(ctx context.Context, completedDays, failedDays int) (deletedCompleted, deletedFailed int64, err error)

CleanupOldEntries removes old completed and failed entries.

func (*Service) CleanupOldEvents

func (s *Service) CleanupOldEvents(ctx context.Context, retentionDays int) (deleted int64, err error)

CleanupOldEvents removes notification events older than the specified retention days. If retentionDays <= 0, no deletion is performed (unlimited retention).

func (*Service) Enqueue

func (s *Service) Enqueue(ctx context.Context, params EnqueueParams) error

Enqueue creates an outbox entry for a notification. This should be called within the same transaction as the business event.

func (*Service) EnqueueInTx

func (s *Service) EnqueueInTx(ctx context.Context, tx *sql.Tx, params EnqueueParams) error

EnqueueInTx creates an outbox entry within an existing transaction. This is the preferred method for the transactional outbox pattern.

func (*Service) ProcessOutboxBatch

func (s *Service) ProcessOutboxBatch(ctx context.Context, workerID string, batchSize int) (processed, failed int, err error)

ProcessOutboxBatch processes a batch of pending outbox entries.

func (*Service) UnlockStaleEntries

func (s *Service) UnlockStaleEntries(ctx context.Context, olderThanMinutes int) (unlocked int64, err error)

UnlockStaleEntries releases locks on stale processing entries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL