Documentation
¶
Index ¶
- Constants
- Variables
- type CreateNotificationInput
- type ExternalNotifier
- type ListResult
- type Notification
- type NotificationFilter
- type NotificationSummary
- type PostgresRepository
- func (r *PostgresRepository) CreateBatch(ctx context.Context, userRecipients map[string]Recipient, ...) (int, error)
- func (r *PostgresRepository) Delete(ctx context.Context, id string) error
- func (r *PostgresRepository) DeleteAllRead(ctx context.Context, userID string) error
- func (r *PostgresRepository) DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
- func (r *PostgresRepository) DeleteReadOlderThan(ctx context.Context, before time.Time) (int64, error)
- func (r *PostgresRepository) EnforcePerUserLimit(ctx context.Context, maxPerUser int) (int64, error)
- func (r *PostgresRepository) Get(ctx context.Context, id string) (*Notification, error)
- func (r *PostgresRepository) GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)
- func (r *PostgresRepository) List(ctx context.Context, filter NotificationFilter) ([]*Notification, int, error)
- func (r *PostgresRepository) ListWithCursor(ctx context.Context, filter NotificationFilter) (*ListResult, error)
- func (r *PostgresRepository) MarkAllAsRead(ctx context.Context, userID string) error
- func (r *PostgresRepository) MarkAllAsReadChunked(ctx context.Context, userID string, chunkSize int) error
- func (r *PostgresRepository) MarkAsRead(ctx context.Context, id string) error
- type Recipient
- type Repository
- type Service
- func (s *Service) Create(ctx context.Context, input CreateNotificationInput) error
- func (s *Service) CreateSync(ctx context.Context, input CreateNotificationInput) (int, error)
- func (s *Service) Delete(ctx context.Context, id, userID string) error
- func (s *Service) DeleteAllRead(ctx context.Context, userID string) error
- func (s *Service) Get(ctx context.Context, id string) (*Notification, error)
- func (s *Service) GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)
- func (s *Service) List(ctx context.Context, filter NotificationFilter) (*ListResult, error)
- func (s *Service) MarkAllAsRead(ctx context.Context, userID string) error
- func (s *Service) MarkAsRead(ctx context.Context, id, userID string) error
- func (s *Service) QueueAssetChange(assetID, assetMRN, assetName, changeType string, owners []Recipient)
- func (s *Service) SetExternalNotifier(notifier ExternalNotifier)
- func (s *Service) Start(ctx context.Context)
- func (s *Service) Stop()
- type ServiceConfig
- type ServiceOption
- type TeamMembershipProvider
- type UserPreferencesProvider
Constants ¶
const ( RecipientTypeUser = "user" RecipientTypeTeam = "team" )
const ( TypeSystem = "system" TypeSchemaChange = "schema_change" TypeAssetChange = "asset_change" TypeTeamInvite = "team_invite" TypeMention = "mention" TypeJobComplete = "job_complete" TypeUpstreamSchemaChange = "upstream_schema_change" TypeDownstreamSchemaChange = "downstream_schema_change" TypeLineageChange = "lineage_change" )
Variables ¶
var ( ErrNotificationNotFound = errors.New("notification not found") )
Functions ¶
This section is empty.
Types ¶
type CreateNotificationInput ¶
type CreateNotificationInput struct {
Recipients []Recipient `json:"recipients" validate:"required,min=1"`
Type string `json:"type" validate:"required"`
Title string `json:"title" validate:"required,max=255"`
Message string `json:"message" validate:"required"`
Data map[string]interface{} `json:"data,omitempty"`
}
CreateNotificationInput is used by internal services to create notifications.
type ExternalNotifier ¶
type ExternalNotifier interface {
DispatchToTeam(ctx context.Context, teamID, notificationType, title, message string, data map[string]interface{})
}
ExternalNotifier dispatches notifications to external systems (webhooks).
type ListResult ¶
type ListResult struct {
Notifications []*Notification `json:"notifications"`
Total int `json:"total"`
NextCursor string `json:"next_cursor,omitempty"`
}
ListResult contains paginated notification results.
type Notification ¶
type Notification struct {
ID string `json:"id"`
UserID string `json:"user_id"`
RecipientType string `json:"recipient_type"`
RecipientID string `json:"recipient_id"`
Type string `json:"type"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
Read bool `json:"read"`
ReadAt *time.Time `json:"read_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
Notification represents a user notification.
type NotificationFilter ¶
type NotificationFilter struct {
UserID string
Type string
ReadOnly *bool
Cursor string
Limit int
Offset int
}
NotificationFilter for listing notifications.
type NotificationSummary ¶
type NotificationSummary struct {
UnreadCount int `json:"unread_count"`
TotalCount int `json:"total_count"`
}
NotificationSummary provides unread count for UI badge.
type PostgresRepository ¶
type PostgresRepository struct {
// contains filtered or unexported fields
}
PostgresRepository implements Repository for PostgreSQL.
func (*PostgresRepository) CreateBatch ¶
func (r *PostgresRepository) CreateBatch(ctx context.Context, userRecipients map[string]Recipient, input CreateNotificationInput, batchSize int) (int, error)
CreateBatch creates notifications for multiple users in batches.
func (*PostgresRepository) Delete ¶
func (r *PostgresRepository) Delete(ctx context.Context, id string) error
Delete deletes a single notification.
func (*PostgresRepository) DeleteAllRead ¶
func (r *PostgresRepository) DeleteAllRead(ctx context.Context, userID string) error
DeleteAllRead deletes all read notifications for a user.
func (*PostgresRepository) DeleteOlderThan ¶
DeleteOlderThan deletes notifications older than the specified time.
func (*PostgresRepository) DeleteReadOlderThan ¶
func (r *PostgresRepository) DeleteReadOlderThan(ctx context.Context, before time.Time) (int64, error)
DeleteReadOlderThan deletes read notifications created before the given time.
func (*PostgresRepository) EnforcePerUserLimit ¶
func (r *PostgresRepository) EnforcePerUserLimit(ctx context.Context, maxPerUser int) (int64, error)
EnforcePerUserLimit deletes the oldest notifications for users who exceed maxPerUser.
func (*PostgresRepository) Get ¶
func (r *PostgresRepository) Get(ctx context.Context, id string) (*Notification, error)
Get retrieves a single notification by ID.
func (*PostgresRepository) GetSummary ¶
func (r *PostgresRepository) GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)
GetSummary returns unread/total count for a user.
func (*PostgresRepository) List ¶
func (r *PostgresRepository) List(ctx context.Context, filter NotificationFilter) ([]*Notification, int, error)
List retrieves notifications for a user with offset-based pagination.
func (*PostgresRepository) ListWithCursor ¶
func (r *PostgresRepository) ListWithCursor(ctx context.Context, filter NotificationFilter) (*ListResult, error)
ListWithCursor retrieves notifications using cursor-based pagination.
func (*PostgresRepository) MarkAllAsRead ¶
func (r *PostgresRepository) MarkAllAsRead(ctx context.Context, userID string) error
MarkAllAsRead marks all notifications for a user as read.
func (*PostgresRepository) MarkAllAsReadChunked ¶
func (r *PostgresRepository) MarkAllAsReadChunked(ctx context.Context, userID string, chunkSize int) error
MarkAllAsReadChunked marks notifications as read in chunks to avoid long locks.
func (*PostgresRepository) MarkAsRead ¶
func (r *PostgresRepository) MarkAsRead(ctx context.Context, id string) error
MarkAsRead marks a single notification as read.
type Repository ¶
type Repository interface {
CreateBatch(ctx context.Context, userRecipients map[string]Recipient, input CreateNotificationInput, batchSize int) (int, error)
Get(ctx context.Context, id string) (*Notification, error)
List(ctx context.Context, filter NotificationFilter) ([]*Notification, int, error)
ListWithCursor(ctx context.Context, filter NotificationFilter) (*ListResult, error)
GetSummary(ctx context.Context, userID string) (*NotificationSummary, error)
MarkAsRead(ctx context.Context, id string) error
MarkAllAsRead(ctx context.Context, userID string) error
MarkAllAsReadChunked(ctx context.Context, userID string, chunkSize int) error
Delete(ctx context.Context, id string) error
DeleteAllRead(ctx context.Context, userID string) error
DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
DeleteReadOlderThan(ctx context.Context, before time.Time) (int64, error)
EnforcePerUserLimit(ctx context.Context, maxPerUser int) (int64, error)
}
Repository defines the notification data access interface.
func NewPostgresRepository ¶
func NewPostgresRepository(db *pgxpool.Pool) Repository
NewPostgresRepository creates a new PostgreSQL notification repository.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service handles notification operations.
func NewService ¶
func NewService(repo Repository, teamProvider TeamMembershipProvider, opts ...ServiceOption) *Service
NewService creates a new notification service.
func (*Service) Create ¶
func (s *Service) Create(ctx context.Context, input CreateNotificationInput) error
Create queues notifications for the specified recipients.
func (*Service) CreateSync ¶
CreateSync creates notifications synchronously.
func (*Service) DeleteAllRead ¶
DeleteAllRead deletes all read notifications for a user.
func (*Service) GetSummary ¶
GetSummary returns unread/total count for a user.
func (*Service) List ¶
func (s *Service) List(ctx context.Context, filter NotificationFilter) (*ListResult, error)
List retrieves notifications for a user with filters.
func (*Service) MarkAllAsRead ¶
MarkAllAsRead marks all notifications for a user as read.
func (*Service) MarkAsRead ¶
MarkAsRead marks a single notification as read.
func (*Service) QueueAssetChange ¶
func (s *Service) QueueAssetChange(assetID, assetMRN, assetName, changeType string, owners []Recipient)
QueueAssetChange queues an asset change for aggregated notification. changeType should be TypeAssetChange or TypeSchemaChange.
func (*Service) SetExternalNotifier ¶
func (s *Service) SetExternalNotifier(notifier ExternalNotifier)
SetExternalNotifier sets the external notification dispatcher after service creation.
type ServiceConfig ¶
type ServiceConfig struct {
MaxWorkers int
QueueSize int
BatchSize int
PruneAge time.Duration
PruneReadAge time.Duration
PruneInterval time.Duration
MaxPerUser int
AggregateWindow time.Duration
AggregateMaxWait time.Duration
DisableAggregation bool
}
ServiceConfig configures the notification service.
type ServiceOption ¶
type ServiceOption func(*Service)
ServiceOption configures the notification service.
func WithConfig ¶
func WithConfig(config *ServiceConfig) ServiceOption
WithConfig sets the service configuration.
func WithDB ¶
func WithDB(db *pgxpool.Pool) ServiceOption
WithDB sets the database pool for singleton task coordination.
func WithExternalNotifier ¶
func WithExternalNotifier(notifier ExternalNotifier) ServiceOption
WithExternalNotifier sets the external notification dispatcher (webhooks) via option.
func WithUserPreferencesProvider ¶
func WithUserPreferencesProvider(provider UserPreferencesProvider) ServiceOption
WithUserPreferencesProvider sets the user preferences provider.
type TeamMembershipProvider ¶
type TeamMembershipProvider interface {
GetTeamMemberUserIDs(ctx context.Context, teamID string) ([]string, error)
}
TeamMembershipProvider provides team membership lookup for notification fan-out.
type UserPreferencesProvider ¶
type UserPreferencesProvider interface {
GetNotificationPreferences(ctx context.Context, userID string) (map[string]bool, error)
GetNotificationPreferencesBatch(ctx context.Context, userIDs []string) (map[string]map[string]bool, error)
}
UserPreferencesProvider provides user notification preferences.