notify

package
v0.31.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DigestWindow    = 30 * time.Second
	DigestThreshold = 5
	DigestMaxItems  = 50
)

Variables

View Source
var ErrChannelTypeUnknown = errors.New("未知通知通道类型")

Functions

func CombineDigest

func CombineDigest(items []DigestItem) (title, text string)

CombineDigest 将多个条目合并为单条消息。单条直接透传;多条生成编号摘要。 超过 DigestMaxItems 的部分会被截断并提示剩余数量。

func IsQuietNow

func IsQuietNow(now time.Time, start, end string) bool

IsQuietNow 判断 now 是否落入 [start, end) 的静默窗口(分钟粒度,不含秒)。 start/end 任一为空、格式非法、或两者相等时视为无静默。 当 start > end(如 22:00–08:00)窗口跨越午夜,等价于 [start,24:00) ∪ [00:00,end)。

func NextQuietEnd

func NextQuietEnd(now time.Time, end string) time.Time

NextQuietEnd 返回下一次静默结束时刻(>= now+1ns)。调用方应先确认当前处于静默时段。 当今天的 end 时刻已过,返回明天的 end 时刻;否则返回今天的 end 时刻。

func RegisterChannel

func RegisterChannel(typ string, factory Factory)

Types

type Button

type Button struct {
	Text         string `json:"text"`
	CallbackData string `json:"callback_data,omitempty"`
	URL          string `json:"url,omitempty"`
}

type Channel

type Channel interface {
	Type() string
	Init(ctx context.Context, conf *models.NotificationConf) error
	SupportsInbound() bool
	Send(ctx context.Context, n Notification) error
	OnInbound(handler InboundHandler)
	Close(ctx context.Context) error
	Healthy() bool
}

type ConfLister

type ConfLister interface {
	ListNotificationConfs(ctx context.Context) ([]models.NotificationConf, error)
}

ConfLister lists notification channel configurations available to the router.

type DedupCache

type DedupCache struct {
	// contains filtered or unexported fields
}

func NewDedupCache

func NewDedupCache(capacity int, window time.Duration) *DedupCache

func (*DedupCache) Seen

func (d *DedupCache) Seen(eventType, primaryID string, confID uint) bool

type DigestBuffer

type DigestBuffer struct {
	// contains filtered or unexported fields
}

DigestBuffer 按 NotificationConf ID 缓冲一段时间内的通知,达到 threshold 立即合并、 否则在 window 到期时合并刷写。

func NewDigestBuffer

func NewDigestBuffer(ctx context.Context, flush DigestFlushFunc) *DigestBuffer

func NewDigestBufferWithWindow

func NewDigestBufferWithWindow(ctx context.Context, flush DigestFlushFunc, window time.Duration, threshold int) *DigestBuffer

NewDigestBufferWithWindow 暴露 window/threshold 用于单测。

func (*DigestBuffer) Add

func (b *DigestBuffer) Add(confID uint, item DigestItem)

Add 入列一条 DigestItem。命中阈值立即异步刷写,否则在窗口到期时刷写。

func (*DigestBuffer) FlushAll

func (b *DigestBuffer) FlushAll()

FlushAll 强制刷写所有 pending 条目,用于优雅退出。

type DigestFlushFunc

type DigestFlushFunc func(ctx context.Context, confID uint, items []DigestItem)

DigestFlushFunc 在窗口结束(条数达阈值或定时器到期)时被调用,由实现方完成 实际推送以及 rss_notification_log 行的结果更新。

type DigestItem

type DigestItem struct {
	LogID uint
	Title string
	Text  string
}

DigestItem 是 DigestBuffer 内部的一条待合并通知。 LogID 指向触发该条目的 rss_notification_log 行 ID,flush 回调据此更新结果。

type Factory

type Factory func() Channel

type InboundHandler

type InboundHandler func(ctx context.Context, msg InboundMessage) error

type InboundMessage

type InboundMessage struct {
	ChannelType   string `json:"channel_type"`
	SourceConfID  uint   `json:"source_conf_id"`
	ChannelUserID string `json:"channel_user_id"`
	Username      string `json:"username"`
	ChatID        string `json:"chat_id"`
	MessageType   string `json:"message_type,omitempty"` // "private" | "group" — used by reply routing
	Text          string `json:"text"`
	IsCallback    bool   `json:"is_callback"`
	CallbackData  string `json:"callback_data"`
}

type Notification

type Notification struct {
	Title             string            `json:"title"`
	Text              string            `json:"text"`
	Image             string            `json:"image,omitempty"`
	Link              string            `json:"link,omitempty"`
	ChannelType       string            `json:"channel_type,omitempty"`
	SourceConfID      uint              `json:"source_conf_id,omitempty"`
	UserID            string            `json:"user_id,omitempty"`
	Targets           map[string]string `json:"targets,omitempty"`
	Buttons           [][]Button        `json:"buttons,omitempty"`
	DisableWebPreview bool              `json:"disable_web_preview,omitempty"`
	OriginalMessageID string            `json:"original_message_id,omitempty"`
}

type OutboxEnqueuer

type OutboxEnqueuer interface {
	Enqueue(ctx context.Context, confID uint, n Notification, errMsg string) error
}

OutboxEnqueuer persists failed deliveries for asynchronous retry.

type OutboxWorker

type OutboxWorker struct {
	// contains filtered or unexported fields
}

OutboxWorker scans pending notification_outbox rows and retries delivery with bounded exponential backoff.

func NewOutboxWorker

func NewOutboxWorker(db *gorm.DB, registry *Registry, interval time.Duration) *OutboxWorker

NewOutboxWorker creates a worker. interval defaults to 10s when <= 0.

func (*OutboxWorker) Start

func (w *OutboxWorker) Start(ctx context.Context)

Start launches one ticker goroutine. Repeated calls do not start additional workers.

func (*OutboxWorker) Stop

func (w *OutboxWorker) Stop()

Stop cancels the worker and waits up to 1s for the goroutine to exit.

func (*OutboxWorker) Tick

func (w *OutboxWorker) Tick(ctx context.Context) error

Tick performs one scan of due pending rows and attempts delivery.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func DefaultRegistry

func DefaultRegistry() *Registry

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Make

func (r *Registry) Make(typ string) (Channel, error)

func (*Registry) Register

func (r *Registry) Register(typ string, factory Factory)

func (*Registry) Types

func (r *Registry) Types() []string

type RouteScope

type RouteScope struct {
	ConfIDs    []uint
	EventType  string
	PrimaryID  string
	SkipDedupe bool
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router fans a notification out to enabled channel configurations, while keeping per-configuration channel instances private and reusable.

func NewRouter

func NewRouter(registry *Registry, outbox OutboxEnqueuer, confs ConfLister) *Router

func (*Router) Route

func (r *Router) Route(ctx context.Context, n Notification, scope RouteScope) error

Directories

Path Synopsis
adapter
qq
telegram
Package telegram implements the notify.Channel adapter for Telegram bots using mymmrac/telego with long-polling.
Package telegram implements the notify.Channel adapter for Telegram bots using mymmrac/telego with long-polling.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL