Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConstructSSETopic ¶ added in v2.0.3
ConstructSSETopic derives a topic from the request URL — path on its own, or path with the query string appended when one is present.
e.g. "/api/v3/device/all/sse?offset=10&labels=label1,label2"
func Handler ¶
func Handler(m *Manager, opts ...HandlerOption) echo.HandlerFunc
Handler returns an echo.HandlerFunc that opens an SSE stream for the caller. It atomically subscribes to the topic (creating the topic's broadcaster on demand) and forwards every published event to the response.
Options:
- WithCustomTopic: override the auto-derived topic (defaults to the request URL path + query).
- WithPollingService: attach a polling service that produces events for the topic. The service starts on the first subscribe and stops when the last subscriber leaves.
Types ¶
type HandlerConfig ¶
type HandlerConfig struct {
PollingService PollingService
CustomTopic string
}
HandlerConfig holds the configuration for the SSE handler.
type HandlerOption ¶
type HandlerOption func(*HandlerConfig)
HandlerOption is a function that modifies the HandlerConfig.
func WithCustomTopic ¶ added in v2.0.5
func WithCustomTopic(topic string) HandlerOption
WithCustomTopic returns a HandlerOption that sets a custom topic in the HandlerConfig.
func WithPollingService ¶
func WithPollingService(service PollingService) HandlerOption
WithPollingService returns a HandlerOption that sets the PollingService in the HandlerConfig.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns one fan-out broadcaster per topic. It is the only public surface that external code interacts with for SSE pubsub — subscribers connect through sse.Handler and publishers push via Manager.Publish.
m.mu guards the broadcasters map: lookup, install, and removal are serialized so a Publish can never see a half-torn-down topic. Fan-out to individual subscribers is guarded by the broadcaster's own b.mu, which Publish acquires after releasing m.mu.
func NewManager ¶
NewManager creates an SSE Manager that lives for the supplied parent context. The heartbeatInterval is applied to every handler the Manager serves; pass zero or a negative value to fall back to the package default.
func (*Manager) Publish ¶ added in v2.0.19
Publish forwards data to every subscriber currently registered on topic. When no broadcaster exists for the topic (no subscriber has connected, or the last one left and cleanup has run), the call returns without doing anything — matching the standard fire-and-forget semantics of in-memory pubsub (Redis pubsub, NATS, in-memory event buses). Callers ship the event on a best-effort basis; SSE does not guarantee delivery.
Safe to call at high frequency: the no-subscriber path is a single map lookup under a read lock.
type Polling ¶
type Polling struct {
// contains filtered or unexported fields
}
Polling is a struct that implements a polling mechanism for fetching data from a data source at regular intervals. It is designed to be started once and can be stopped gracefully.
func NewPolling ¶
func NewPolling(lc log.Logger, pollingFunc func(context.Context) (any, error), opts ...PollingOption) *Polling
NewPolling builds a Polling that calls pollingFunc every interval and publishes the result to subscribers.
pollingFunc receives a ctx that is cancelled when Stop is called. Watch for cancellation, otherwise Stop will block until pollingFunc returns on its own.
Good — passes ctx down to the network call:
func fetch(ctx context.Context) (any, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
return http.DefaultClient.Do(req)
}
Bad — ignores ctx, so a slow server makes Stop block:
func fetch(ctx context.Context) (any, error) {
return http.Get(url) // no ctx
}
type PollingConfig ¶ added in v2.0.8
type PollingOption ¶ added in v2.0.8
type PollingOption func(*PollingConfig)
PollingOption is a function that modifies the PollingConfig.
func WithCustomApiVersion ¶ added in v2.0.8
func WithCustomApiVersion(apiVersion string) PollingOption
WithCustomApiVersion returns a PollingOption that sets a custom API version in the PollingConfig, which is used to present the API version for the error response when polling fails. Default is common.ApiVersion set in go-mod-edge-utils if not set.
func WithCustomPollingInterval ¶ added in v2.0.8
func WithCustomPollingInterval(interval time.Duration) PollingOption
WithCustomPollingInterval returns a PollingOption that sets a custom polling interval in the PollingConfig. Default is 5 seconds if not set.
func WithStopCallback ¶ added in v2.0.16
func WithStopCallback(fn func()) PollingOption
WithStopCallback returns a PollingOption that sets a callback to be invoked when polling stops. The callback is called regardless of the reason polling stopped (stop condition met, explicit Stop(), or context cancellation).
func WithStopCondition ¶ added in v2.0.16
func WithStopCondition(fn func(any) bool) PollingOption
WithStopCondition returns a PollingOption that sets a stop condition function in the PollingConfig. Polling will stop after publishing data if the function returns true.
type PollingService ¶
type PollingService interface {
// Start begins fetching. Called once per broadcaster, on the first
// subscribe.
Start(publisher Publisher)
// Stop ends fetching. Called in a background goroutine when the last
// subscriber leaves; Manager does not wait for it. If Stop never
// returns, that goroutine leaks.
//
// Stop may also be called when Start was never called, so guard
// against nil fields.
//
// See NewPolling for the bundled implementation and an example.
Stop() error
}
PollingService periodically fetches data and publishes it to subscribers. One PollingService is attached to a broadcaster when the first subscriber connects (via WithPollingService).