Documentation
¶
Index ¶
- Constants
- Variables
- type AlertRepository
- type Config
- type Deps
- type Dispatcher
- type Filter
- type Handler
- type HandlerConfig
- type HandlerOption
- type Idempotency
- type IdempotencyFilter
- type IdempotencyRepository
- type LogService
- type Message
- func (m *Message) MarkExpired(updatedAt time.Time, err error)
- func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)
- func (m *Message) MarkPending(updatedAt time.Time)
- func (m *Message) MarkPublished(updatedAt time.Time)
- func (m *Message) ToV1beta1Proto() (*sirenv1beta1.NotificationMessage, error)
- type MessageConfig
- type MessageOption
- type MessageStatus
- type MetaMessage
- type Notification
- type Notifier
- type Queuer
- type ReceiverSelectors
- type ReceiverService
- type Repository
- type Router
- type RouterReceiverService
- type RouterSubscriberService
- type Service
- func (s *Service) CheckIdempotency(ctx context.Context, scope, key string) (string, error)
- func (s *Service) Dispatch(ctx context.Context, ns []Notification) ([]string, error)
- func (s *Service) InsertIdempotency(ctx context.Context, scope, key, notificationID string) error
- func (s *Service) List(ctx context.Context, flt Filter) ([]Notification, error)
- func (s *Service) ListNotificationMessages(ctx context.Context, notificationID string) ([]Message, error)
- func (s *Service) PrepareMessages(ctx context.Context, metaMessages []MetaMessage) ([]Message, error)
- func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error
- func (s *Service) RenderMessages(ctx context.Context, metaMessages []MetaMessage) (messages []Message, err error)
- type SilenceService
- type SubscriptionService
- type TemplateService
- type Transactor
Constants ¶
const ( ValidDurationRequestKey string = "valid_duration" RouterReceiver string = "receiver" RouterSubscriber string = "subscriber" TypeAlert string = "alert" TypeEvent string = "event" DispatchKindBulkNotification = "bulk_notification" DispatchKindSingleNotification = "single_notification" )
Variables ¶
var ( ErrNoMessage = errors.New("no message sent, probably because not matching any subscription or receiver") ErrRouteSubscriberNoMatchFound = errors.New("not matching any subscription") )
Functions ¶
This section is empty.
Types ¶
type AlertRepository ¶ added in v0.7.6
type Config ¶
type Config struct {
MaxNumReceiverSelectors int `mapstructure:"max_num_receiver_selectors" yaml:"max_num_receiver_selectors" default:"10"`
MaxMessagesReceiverFlow int `mapstructure:"max_messages_receiver_flow" yaml:"max_messages_receiver_flow" default:"10"`
Queue queues.Config `mapstructure:"queue" yaml:"queue"`
MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
DLQHandler HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
GroupBy []string `mapstructure:"group_by" yaml:"group_by"`
Message MessageConfig `mapstructure:"message" yaml:"message"`
}
type Deps ¶
type Deps struct {
Cfg Config
Logger saltlog.Logger
Repository Repository
Q Queuer
IdempotencyRepository IdempotencyRepository
AlertRepository AlertRepository
LogService LogService
ReceiverService ReceiverService
TemplateService TemplateService
SubscriptionService SubscriptionService
SilenceService SilenceService
}
type Dispatcher ¶
type Dispatcher interface {
Dispatch(ctx context.Context, ns []Notification) ([]string, error)
}
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is a process to handle message publishing
func NewHandler ¶
func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler
NewHandler creates a new handler with some supported type of Notifiers
func (*Handler) MessageHandler ¶
MessageHandler is a function to handler dequeued message
type HandlerConfig ¶
type HandlerConfig struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" default:"true"`
PollDuration time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
ReceiverTypes []string `mapstructure:"receiver_types" yaml:"receiver_types"`
BatchSize int `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}
type HandlerOption ¶
type HandlerOption func(*Handler)
HandlerOption is an option to customize handler creation
func HandlerWithBatchSize ¶
func HandlerWithBatchSize(bs int) HandlerOption
HandlerWithBatchSize sets created handler with the specified batch size
func HandlerWithIdentifier ¶
func HandlerWithIdentifier(identifier string) HandlerOption
HandlerWithIdentifier sets created handler with the specified batch size
type Idempotency ¶
type IdempotencyFilter ¶
type IdempotencyRepository ¶
type IdempotencyRepository interface {
Create(ctx context.Context, scope, key, notificationID string) (*Idempotency, error)
Check(ctx context.Context, scope, key string) (*Idempotency, error)
Delete(context.Context, IdempotencyFilter) error
}
type LogService ¶
type LogService interface {
LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}
type Message ¶
type Message struct {
ID string
NotificationIDs []string
Status MessageStatus
ReceiverType string
Configs map[string]any // the datasource to build vendor-specific configs
Details map[string]any // the datasource to build vendor-specific message
MaxTries int
ExpiredAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
LastError string
TryCount int
Retryable bool
// contains filtered or unexported fields
}
Message is the model to be sent for a specific subscription's receiver
func InitMessage ¶
func InitMessage( ctx context.Context, notifierPlugin Notifier, templateService TemplateService, n Notification, receiverType string, messageConfig map[string]any, opts ...MessageOption, ) (Message, error)
Initialize initializes the message with some default value or the customized value
func InitMessageByMetaMessage ¶ added in v0.7.6
func InitMessageByMetaMessage( ctx context.Context, cfg Config, notifierPlugin Notifier, templateService TemplateService, mm MetaMessage, opts ...MessageOption, ) (Message, error)
func (*Message) MarkExpired ¶ added in v0.8.6
MarkExpired update message to the expired state
func (*Message) MarkFailed ¶
MarkFailed update message to the failed state
func (*Message) MarkPending ¶
MarkPending update message to the pending state
func (*Message) MarkPublished ¶
MarkPublished update message to the published state
func (*Message) ToV1beta1Proto ¶ added in v0.7.3
func (m *Message) ToV1beta1Proto() (*sirenv1beta1.NotificationMessage, error)
type MessageConfig ¶ added in v0.8.6
type MessageConfig struct {
VerboseEnabled bool `mapstructure:"verbose_enabled" yaml:"verbose_enabled" default:"false"`
}
type MessageOption ¶
type MessageOption func(*Message)
MessageOption provides ability to configure the message initialization
func InitWithCreateTime ¶
func InitWithCreateTime(timeNow time.Time) MessageOption
InitWithCreateTime initializes the message with custom create time
func InitWithExpiryDuration ¶
func InitWithExpiryDuration(dur time.Duration) MessageOption
InitWithExpiryDuration initializes the message with the specified expiry duration
func InitWithID ¶
func InitWithID(id string) MessageOption
InitWithID initializes the message with some ID
func InitWithMaxTries ¶
func InitWithMaxTries(mt int) MessageOption
InitWithMaxTries initializes the message with custom max tries
type MessageStatus ¶
type MessageStatus string
MessageStatus determines the state of the message
const ( // additional details DetailsKeyNotificationType = "notification_type" MessageStatusEnqueued MessageStatus = "enqueued" MessageStatusFailed MessageStatus = "failed" MessageStatusPending MessageStatus = "pending" MessageStatusPublished MessageStatus = "published" MessageStatusExpired MessageStatus = "expired" )
func (MessageStatus) String ¶
func (ms MessageStatus) String() string
type MetaMessage ¶ added in v0.7.6
type MetaMessage struct {
ReceiverID uint64
SubscriptionIDs []uint64
ReceiverType string
NotificationIDs []string
NotificationType string
ReceiverConfigs map[string]any
Data map[string]any
ValidDuration time.Duration
Template string
Labels map[string]string
MergedLabels map[string][]string
UniqueKey string
}
func MergeMetaMessage ¶ added in v0.7.6
func MergeMetaMessage(from MetaMessage, to MetaMessage) MetaMessage
func ReduceMetaMessages ¶ added in v0.7.6
func ReduceMetaMessages(metaMessages []MetaMessage, groupBy []string) ([]MetaMessage, error)
type Notification ¶
type Notification struct {
ID string `json:"id"`
NamespaceID uint64 `json:"namespace_id"`
Type string `json:"type"`
Data map[string]any `json:"data"`
Labels map[string]string `json:"labels"`
ValidDuration time.Duration `json:"valid_duration"`
Template string `json:"template"`
UniqueKey string `json:"unique_key"`
ReceiverSelectors ReceiverSelectors `json:"receiver_selectors"`
CreatedAt time.Time `json:"created_at"`
// won't be stored in notification table, only to propagate this to notification_subscriber
AlertIDs []int64
}
Notification is a model of notification
func (*Notification) EnrichID ¶
func (n *Notification) EnrichID(id string)
func (Notification) MetaMessage ¶ added in v0.7.6
func (n Notification) MetaMessage(receiverView subscription.ReceiverView) MetaMessage
func (Notification) Validate ¶
func (n Notification) Validate(routerKind string) error
type Notifier ¶
type Notifier interface {
PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
GetSystemDefaultTemplate() string
Send(ctx context.Context, message Message) (bool, error)
PostProcessMessage(MetaMessage, *Message) *Message
}
type Queuer ¶
type Queuer interface {
ListMessages(ctx context.Context, notificationID string) ([]Message, error)
Enqueue(ctx context.Context, ms ...Message) error
Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
SuccessCallback(ctx context.Context, ms Message) error
ErrorCallback(ctx context.Context, ms Message) error
Cleanup(ctx context.Context, filter queues.FilterCleanup) error
Stop(ctx context.Context) error
}
type ReceiverSelectors ¶ added in v0.8.4
type ReceiverService ¶
type Repository ¶
type Repository interface {
Transactor
BulkCreate(context.Context, []Notification) ([]Notification, error)
Create(context.Context, Notification) (Notification, error)
List(context.Context, Filter) ([]Notification, error)
}
type Router ¶ added in v0.7.6
type Router interface {
PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)
}
type RouterReceiverService ¶ added in v0.7.6
type RouterReceiverService struct {
// contains filtered or unexported fields
}
func NewRouterReceiverService ¶ added in v0.7.6
func NewRouterReceiverService( deps Deps, ) *RouterReceiverService
func (*RouterReceiverService) PrepareMetaMessages ¶ added in v0.7.6
func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)
type RouterSubscriberService ¶ added in v0.7.6
type RouterSubscriberService struct {
// contains filtered or unexported fields
}
func NewRouterSubscriberService ¶ added in v0.7.6
func NewRouterSubscriberService( deps Deps, ) *RouterSubscriberService
func (*RouterSubscriberService) PrepareMetaMessages ¶ added in v0.7.6
func (s *RouterSubscriberService) PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is a service for notification domain
func NewService ¶
func NewService( deps Deps, routerMap map[string]Router, notifierPlugins map[string]Notifier, ) *Service
NewService creates a new notification service
func (*Service) CheckIdempotency ¶ added in v0.7.0
func (*Service) InsertIdempotency ¶ added in v0.7.0
func (*Service) ListNotificationMessages ¶ added in v0.7.3
func (*Service) PrepareMessages ¶ added in v0.7.8
func (*Service) RemoveIdempotencies ¶
func (*Service) RenderMessages ¶ added in v0.7.8
type SilenceService ¶
type SubscriptionService ¶
type SubscriptionService interface {
MatchByLabelsV2(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.ReceiverView, error)
}