Documentation
¶
Index ¶
- Constants
- Variables
- func CombineDigest(items []DigestItem) (title, text string)
- func IsQuietNow(now time.Time, start, end string) bool
- func NextQuietEnd(now time.Time, end string) time.Time
- func RegisterChannel(typ string, factory Factory)
- type Button
- type Channel
- type ConfLister
- type DedupCache
- type DigestBuffer
- type DigestFlushFunc
- type DigestItem
- type Factory
- type InboundHandler
- type InboundMessage
- type Notification
- type OutboxEnqueuer
- type OutboxWorker
- type Registry
- type RouteScope
- type Router
Constants ¶
const ( DigestWindow = 30 * time.Second DigestThreshold = 5 DigestMaxItems = 50 )
Variables ¶
var ErrChannelTypeUnknown = errors.New("未知通知通道类型")
Functions ¶
func CombineDigest ¶
func CombineDigest(items []DigestItem) (title, text string)
CombineDigest 将多个条目合并为单条消息。单条直接透传;多条生成编号摘要。 超过 DigestMaxItems 的部分会被截断并提示剩余数量。
func IsQuietNow ¶
IsQuietNow 判断 now 是否落入 [start, end) 的静默窗口(分钟粒度,不含秒)。 start/end 任一为空、格式非法、或两者相等时视为无静默。 当 start > end(如 22:00–08:00)窗口跨越午夜,等价于 [start,24:00) ∪ [00:00,end)。
func NextQuietEnd ¶
NextQuietEnd 返回下一次静默结束时刻(>= now+1ns)。调用方应先确认当前处于静默时段。 当今天的 end 时刻已过,返回明天的 end 时刻;否则返回今天的 end 时刻。
func RegisterChannel ¶
Types ¶
type ConfLister ¶
type ConfLister interface {
ListNotificationConfs(ctx context.Context) ([]models.NotificationConf, error)
}
ConfLister lists notification channel configurations available to the router.
type DedupCache ¶
type DedupCache struct {
// contains filtered or unexported fields
}
func NewDedupCache ¶
func NewDedupCache(capacity int, window time.Duration) *DedupCache
type DigestBuffer ¶
type DigestBuffer struct {
// contains filtered or unexported fields
}
DigestBuffer 按 NotificationConf ID 缓冲一段时间内的通知,达到 threshold 立即合并、 否则在 window 到期时合并刷写。
func NewDigestBuffer ¶
func NewDigestBuffer(ctx context.Context, flush DigestFlushFunc) *DigestBuffer
func NewDigestBufferWithWindow ¶
func NewDigestBufferWithWindow(ctx context.Context, flush DigestFlushFunc, window time.Duration, threshold int) *DigestBuffer
NewDigestBufferWithWindow 暴露 window/threshold 用于单测。
func (*DigestBuffer) Add ¶
func (b *DigestBuffer) Add(confID uint, item DigestItem)
Add 入列一条 DigestItem。命中阈值立即异步刷写,否则在窗口到期时刷写。
func (*DigestBuffer) FlushAll ¶
func (b *DigestBuffer) FlushAll()
FlushAll 强制刷写所有 pending 条目,用于优雅退出。
type DigestFlushFunc ¶
type DigestFlushFunc func(ctx context.Context, confID uint, items []DigestItem)
DigestFlushFunc 在窗口结束(条数达阈值或定时器到期)时被调用,由实现方完成 实际推送以及 rss_notification_log 行的结果更新。
type DigestItem ¶
DigestItem 是 DigestBuffer 内部的一条待合并通知。 LogID 指向触发该条目的 rss_notification_log 行 ID,flush 回调据此更新结果。
type InboundHandler ¶
type InboundHandler func(ctx context.Context, msg InboundMessage) error
type InboundMessage ¶
type InboundMessage struct {
ChannelType string `json:"channel_type"`
SourceConfID uint `json:"source_conf_id"`
ChannelUserID string `json:"channel_user_id"`
Username string `json:"username"`
ChatID string `json:"chat_id"`
MessageType string `json:"message_type,omitempty"` // "private" | "group" — used by reply routing
Text string `json:"text"`
IsCallback bool `json:"is_callback"`
CallbackData string `json:"callback_data"`
}
type Notification ¶
type Notification struct {
Title string `json:"title"`
Text string `json:"text"`
Image string `json:"image,omitempty"`
Link string `json:"link,omitempty"`
ChannelType string `json:"channel_type,omitempty"`
SourceConfID uint `json:"source_conf_id,omitempty"`
UserID string `json:"user_id,omitempty"`
Targets map[string]string `json:"targets,omitempty"`
Buttons [][]Button `json:"buttons,omitempty"`
DisableWebPreview bool `json:"disable_web_preview,omitempty"`
OriginalMessageID string `json:"original_message_id,omitempty"`
}
type OutboxEnqueuer ¶
type OutboxEnqueuer interface {
Enqueue(ctx context.Context, confID uint, n Notification, errMsg string) error
}
OutboxEnqueuer persists failed deliveries for asynchronous retry.
type OutboxWorker ¶
type OutboxWorker struct {
// contains filtered or unexported fields
}
OutboxWorker scans pending notification_outbox rows and retries delivery with bounded exponential backoff.
func NewOutboxWorker ¶
NewOutboxWorker creates a worker. interval defaults to 10s when <= 0.
func (*OutboxWorker) Start ¶
func (w *OutboxWorker) Start(ctx context.Context)
Start launches one ticker goroutine. Repeated calls do not start additional workers.
func (*OutboxWorker) Stop ¶
func (w *OutboxWorker) Stop()
Stop cancels the worker and waits up to 1s for the goroutine to exit.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func DefaultRegistry ¶
func DefaultRegistry() *Registry
func NewRegistry ¶
func NewRegistry() *Registry
type RouteScope ¶
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router fans a notification out to enabled channel configurations, while keeping per-configuration channel instances private and reusable.
func NewRouter ¶
func NewRouter(registry *Registry, outbox OutboxEnqueuer, confs ConfLister) *Router
func (*Router) Route ¶
func (r *Router) Route(ctx context.Context, n Notification, scope RouteScope) error