Documentation
¶
Index ¶
- Variables
- type Config
- type Handler
- type HandlerConfig
- type HandlerOption
- type IdempotencyRepository
- type Message
- func (m *Message) AddStringDetail(key, value string)
- func (m *Message) Initialize(n Notification, receiverType string, ...)
- func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)
- func (m *Message) MarkPending(updatedAt time.Time)
- func (m *Message) MarkPublished(updatedAt time.Time)
- type MessageOption
- type MessageStatus
- type Notification
- type Notifier
- type Queuer
- type ReceiverService
- type RoutingMethod
- type Service
- func (s *Service) CheckAndInsertIdempotency(ctx context.Context, scope, key string) (uint64, error)
- func (ns *Service) DispatchToReceiver(ctx context.Context, n Notification, receiverID uint64) error
- func (ns *Service) DispatchToSubscribers(ctx context.Context, namespaceID uint64, n Notification) error
- func (s *Service) MarkIdempotencyAsSuccess(ctx context.Context, id uint64) error
- func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error
- type SubscriptionService
Constants ¶
This section is empty.
Variables ¶
var ErrNoMessage = errors.New("no message found")
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Queue queues.Config `mapstructure:"queue" yaml:"queue"`
MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
DLQHandler HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
}
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is a process to handle message publishing
func NewHandler ¶
func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler
NewHandler creates a new handler with some supported type of Notifiers
func (*Handler) MessageHandler ¶
MessageHandler is a function to handler dequeued message
type HandlerConfig ¶
type HandlerConfig struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" default:"true"`
PollDuration time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
ReceiverTypes []string `mapstructure:"receiver_types" yaml:"receiver_types"`
BatchSize int `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}
type HandlerOption ¶
type HandlerOption func(*Handler)
HandlerOption is an option to customize handler creation
func HandlerWithBatchSize ¶
func HandlerWithBatchSize(bs int) HandlerOption
HandlerWithBatchSize sets created handler with the specified batch size
func HandlerWithIdentifier ¶
func HandlerWithIdentifier(identifier string) HandlerOption
HandlerWithIdentifier sets created handler with the specified batch size
type IdempotencyRepository ¶ added in v0.5.6
type Message ¶
type Message struct {
ID string
Status MessageStatus
ReceiverType string
Configs map[string]interface{} // the datasource to build vendor-specific configs
Details map[string]interface{} // the datasource to build vendor-specific message
LastError string
MaxTries int
TryCount int
Retryable bool
ExpiredAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
// contains filtered or unexported fields
}
Message is the model to be sent for a specific subscription's receiver
func (*Message) AddStringDetail ¶
AddDetail adds a custom kv string detail
func (*Message) Initialize ¶
func (m *Message) Initialize( n Notification, receiverType string, notificationConfigs map[string]interface{}, opts ...MessageOption, )
Initialize initializes the message with some default value or the customized value
func (*Message) MarkFailed ¶
MarkFailed update message to the failed state
func (*Message) MarkPending ¶
MarkPending update message to the pending state
func (*Message) MarkPublished ¶
MarkPublished update message to the published state
type MessageOption ¶
type MessageOption func(*Message)
MessageOption provides ability to configure the message initialization
func InitWithCreateTime ¶
func InitWithCreateTime(timeNow time.Time) MessageOption
InitWithCreateTime initializes the message with custom create time
func InitWithExpiryDuration ¶
func InitWithExpiryDuration(dur time.Duration) MessageOption
InitWithExpiryDuration initializes the message with the specified expiry duration
func InitWithID ¶
func InitWithID(id string) MessageOption
InitWithID initializes the message with some ID
func InitWithMaxTries ¶
func InitWithMaxTries(mt int) MessageOption
InitWithMaxTries initializes the message with custom max tries
type MessageStatus ¶
type MessageStatus string
MessageStatus determines the state of the message
const ( DefaultMaxTries = 3 // additional details DetailsKeyRoutingMethod = "routing_method" MessageStatusEnqueued MessageStatus = "enqueued" MessageStatusFailed MessageStatus = "failed" MessageStatusPending MessageStatus = "pending" MessageStatusPublished MessageStatus = "published" )
func (MessageStatus) String ¶ added in v0.5.2
func (ms MessageStatus) String() string
type Notification ¶
type Notification struct {
ID string `json:"id"`
Data map[string]interface{} `json:"data"`
Labels map[string]string `json:"labels"`
ValidDurationString string `json:"valid_duration"`
Template string `json:"template"`
CreatedAt time.Time
}
Notification is a model of notification
type Notifier ¶
type Notifier interface {
PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
GetSystemDefaultTemplate() string
Send(ctx context.Context, message Message) (bool, error)
}
type Queuer ¶
type Queuer interface {
Enqueue(ctx context.Context, ms ...Message) error
Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
SuccessCallback(ctx context.Context, ms Message) error
ErrorCallback(ctx context.Context, ms Message) error
Type() string
Cleanup(ctx context.Context, filter queues.FilterCleanup) error
Stop(ctx context.Context) error
}
type ReceiverService ¶
type RoutingMethod ¶
type RoutingMethod string
const ( RoutingMethodReceiver RoutingMethod = "receiver" RoutingMethodSubscribers RoutingMethod = "subscribers" )
func (RoutingMethod) String ¶
func (rm RoutingMethod) String() string
type Service ¶ added in v0.5.6
type Service struct {
// contains filtered or unexported fields
}
Service is a service for notification domain
func NewService ¶
func NewService( logger log.Logger, q Queuer, idempotencyRepository IdempotencyRepository, receiverService ReceiverService, subscriptionService SubscriptionService, notifierPlugins map[string]Notifier, ) *Service
NewService creates a new notification service
func (*Service) CheckAndInsertIdempotency ¶ added in v0.5.6
func (*Service) DispatchToReceiver ¶ added in v0.5.6
func (*Service) DispatchToSubscribers ¶ added in v0.5.6
func (*Service) MarkIdempotencyAsSuccess ¶ added in v0.5.6
type SubscriptionService ¶
type SubscriptionService interface {
MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}