notification

package
v0.7.0-preview3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RecipientTypeUser = "user"
	RecipientTypeTeam = "team"
)
View Source
const (
	TypeSystem                 = "system"
	TypeSchemaChange           = "schema_change"
	TypeAssetChange            = "asset_change"
	TypeTeamInvite             = "team_invite"
	TypeMention                = "mention"
	TypeJobComplete            = "job_complete"
	TypeUpstreamSchemaChange   = "upstream_schema_change"
	TypeDownstreamSchemaChange = "downstream_schema_change"
	TypeLineageChange          = "lineage_change"
	TypeAssetDeleted           = "asset_deleted"
)
View Source
const (
	MaxBatchSize     = 500
	DefaultBatchSize = 100

	DefaultPruneAge         = 90 * 24 * time.Hour
	DefaultPruneReadAge     = 14 * 24 * time.Hour
	DefaultPruneInterval    = 24 * time.Hour
	DefaultMaxPerUser       = 500
	DefaultAggregateWindow  = 2 * time.Minute
	DefaultAggregateMaxWait = 5 * time.Minute
)

Variables

View Source
var (
	ErrNotificationNotFound = errors.New("notification not found")
	ErrUnauthorized         = errors.New("unauthorized to access notification")
)

Functions

This section is empty.

Types

type CreateNotificationInput

type CreateNotificationInput struct {
	Recipients []Recipient            `json:"recipients" validate:"required,min=1"`
	Type       string                 `json:"type" validate:"required"`
	Title      string                 `json:"title" validate:"required,max=255"`
	Message    string                 `json:"message" validate:"required"`
	Data       map[string]interface{} `json:"data,omitempty"`
}

CreateNotificationInput is used by internal services to create notifications.

type ExternalNotifier

type ExternalNotifier interface {
	DispatchToTeam(ctx context.Context, teamID, notificationType, title, message string, data map[string]interface{})
}

ExternalNotifier dispatches notifications to external systems (webhooks).

type ListResult

type ListResult struct {
	Notifications []*Notification `json:"notifications"`
	Total         int             `json:"total"`
	NextCursor    string          `json:"next_cursor,omitempty"`
}

ListResult contains paginated notification results.

type Notification

type Notification struct {
	ID            string                 `json:"id"`
	UserID        string                 `json:"user_id"`
	RecipientType string                 `json:"recipient_type"`
	RecipientID   string                 `json:"recipient_id"`
	Type          string                 `json:"type"`
	Title         string                 `json:"title"`
	Message       string                 `json:"message"`
	Data          map[string]interface{} `json:"data,omitempty"`
	Read          bool                   `json:"read"`
	ReadAt        *time.Time             `json:"read_at,omitempty"`
	CreatedAt     time.Time              `json:"created_at"`
}

Notification represents a user notification.

type NotificationFilter

type NotificationFilter struct {
	UserID   string
	Type     string
	ReadOnly *bool
	Cursor   string
	Limit    int
	Offset   int
}

NotificationFilter for listing notifications.

type NotificationSummary

type NotificationSummary struct {
	UnreadCount int `json:"unread_count"`
	TotalCount  int `json:"total_count"`
}

NotificationSummary provides unread count for UI badge.

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

PostgresRepository implements Repository for PostgreSQL.

func (*PostgresRepository) CreateBatch

func (r *PostgresRepository) CreateBatch(ctx context.Context, userRecipients map[string]Recipient, input CreateNotificationInput, batchSize int) (int, error)

CreateBatch creates notifications for multiple users in batches.

func (*PostgresRepository) Delete

func (r *PostgresRepository) Delete(ctx context.Context, id string) error

Delete deletes a single notification.

func (*PostgresRepository) DeleteAllRead

func (r *PostgresRepository) DeleteAllRead(ctx context.Context, userID string) error

DeleteAllRead deletes all read notifications for a user.

func (*PostgresRepository) DeleteOlderThan

func (r *PostgresRepository) DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)

DeleteOlderThan deletes notifications older than the specified time.

func (*PostgresRepository) DeleteReadOlderThan

func (r *PostgresRepository) DeleteReadOlderThan(ctx context.Context, before time.Time) (int64, error)

DeleteReadOlderThan deletes read notifications created before the given time.

func (*PostgresRepository) EnforcePerUserLimit

func (r *PostgresRepository) EnforcePerUserLimit(ctx context.Context, maxPerUser int) (int64, error)

EnforcePerUserLimit deletes the oldest notifications for users who exceed maxPerUser.

func (*PostgresRepository) Get

Get retrieves a single notification by ID.

func (*PostgresRepository) GetSummary

func (r *PostgresRepository) GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)

GetSummary returns unread/total count for a user.

func (*PostgresRepository) List

List retrieves notifications for a user with offset-based pagination.

func (*PostgresRepository) ListWithCursor

func (r *PostgresRepository) ListWithCursor(ctx context.Context, filter NotificationFilter) (*ListResult, error)

ListWithCursor retrieves notifications using cursor-based pagination.

func (*PostgresRepository) MarkAllAsRead

func (r *PostgresRepository) MarkAllAsRead(ctx context.Context, userID string) error

MarkAllAsRead marks all notifications for a user as read.

func (*PostgresRepository) MarkAllAsReadChunked

func (r *PostgresRepository) MarkAllAsReadChunked(ctx context.Context, userID string, chunkSize int) error

MarkAllAsReadChunked marks notifications as read in chunks to avoid long locks.

func (*PostgresRepository) MarkAsRead

func (r *PostgresRepository) MarkAsRead(ctx context.Context, id string) error

MarkAsRead marks a single notification as read.

type Recipient

type Recipient struct {
	Type string `json:"type"`
	ID   string `json:"id"`
}

Recipient represents a notification target.

type Repository

type Repository interface {
	CreateBatch(ctx context.Context, userRecipients map[string]Recipient, input CreateNotificationInput, batchSize int) (int, error)
	Get(ctx context.Context, id string) (*Notification, error)
	List(ctx context.Context, filter NotificationFilter) ([]*Notification, int, error)
	ListWithCursor(ctx context.Context, filter NotificationFilter) (*ListResult, error)
	GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)
	MarkAsRead(ctx context.Context, id string) error
	MarkAllAsRead(ctx context.Context, userID string) error
	MarkAllAsReadChunked(ctx context.Context, userID string, chunkSize int) error
	Delete(ctx context.Context, id string) error
	DeleteAllRead(ctx context.Context, userID string) error
	DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
	DeleteReadOlderThan(ctx context.Context, before time.Time) (int64, error)
	EnforcePerUserLimit(ctx context.Context, maxPerUser int) (int64, error)
}

Repository defines the notification data access interface.

func NewPostgresRepository

func NewPostgresRepository(db *pgxpool.Pool) Repository

NewPostgresRepository creates a new PostgreSQL notification repository.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service handles notification operations.

func NewService

func NewService(repo Repository, teamProvider TeamMembershipProvider, opts ...ServiceOption) *Service

NewService creates a new notification service.

func (*Service) Create

func (s *Service) Create(ctx context.Context, input CreateNotificationInput) error

Create queues notifications for the specified recipients.

func (*Service) CreateSync

func (s *Service) CreateSync(ctx context.Context, input CreateNotificationInput) (int, error)

CreateSync creates notifications synchronously.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id, userID string) error

Delete deletes a notification.

func (*Service) DeleteAllRead

func (s *Service) DeleteAllRead(ctx context.Context, userID string) error

DeleteAllRead deletes all read notifications for a user.

func (*Service) Get

func (s *Service) Get(ctx context.Context, id string) (*Notification, error)

Get retrieves a single notification by ID.

func (*Service) GetSummary

func (s *Service) GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)

GetSummary returns unread/total count for a user.

func (*Service) List

func (s *Service) List(ctx context.Context, filter NotificationFilter) (*ListResult, error)

List retrieves notifications for a user with filters.

func (*Service) MarkAllAsRead

func (s *Service) MarkAllAsRead(ctx context.Context, userID string) error

MarkAllAsRead marks all notifications for a user as read.

func (*Service) MarkAsRead

func (s *Service) MarkAsRead(ctx context.Context, id, userID string) error

MarkAsRead marks a single notification as read.

func (*Service) QueueAssetChange

func (s *Service) QueueAssetChange(assetID, assetMRN, assetName, changeType string, owners []Recipient)

QueueAssetChange queues an asset change for aggregated notification. changeType should be TypeAssetChange or TypeSchemaChange.

func (*Service) SetExternalNotifier

func (s *Service) SetExternalNotifier(notifier ExternalNotifier)

SetExternalNotifier sets the external notification dispatcher after service creation.

func (*Service) Start

func (s *Service) Start(ctx context.Context)

Start begins background processing.

func (*Service) Stop

func (s *Service) Stop()

Stop gracefully shuts down the service.

type ServiceConfig

type ServiceConfig struct {
	MaxWorkers         int
	QueueSize          int
	BatchSize          int
	PruneAge           time.Duration
	PruneReadAge       time.Duration
	PruneInterval      time.Duration
	MaxPerUser         int
	AggregateWindow    time.Duration
	AggregateMaxWait   time.Duration
	DisableAggregation bool
}

ServiceConfig configures the notification service.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption configures the notification service.

func WithConfig

func WithConfig(config *ServiceConfig) ServiceOption

WithConfig sets the service configuration.

func WithDB

func WithDB(db *pgxpool.Pool) ServiceOption

WithDB sets the database pool for singleton task coordination.

func WithExternalNotifier

func WithExternalNotifier(notifier ExternalNotifier) ServiceOption

WithExternalNotifier sets the external notification dispatcher (webhooks) via option.

func WithUserPreferencesProvider

func WithUserPreferencesProvider(provider UserPreferencesProvider) ServiceOption

WithUserPreferencesProvider sets the user preferences provider.

type TeamMembershipProvider

type TeamMembershipProvider interface {
	GetTeamMemberUserIDs(ctx context.Context, teamID string) ([]string, error)
}

TeamMembershipProvider provides team membership lookup for notification fan-out.

type UserPreferencesProvider

type UserPreferencesProvider interface {
	GetNotificationPreferences(ctx context.Context, userID string) (map[string]bool, error)
	GetNotificationPreferencesBatch(ctx context.Context, userIDs []string) (map[string]map[string]bool, error)
}

UserPreferencesProvider provides user notification preferences.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL