sse

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2026 License: GPL-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityPayload

type ActivityPayload struct {
	Type     string          `json:"type"`
	Activity *activity.Event `json:"activity,omitempty"`
}

ActivityPayload is the message envelope for qui-owned server activity events. It is intentionally distinct from StreamPayload (whose Data is a torrent response) so the frontend's torrent-stream router never sees activity events: they are delivered as a separate named "activity" SSE event with their own handler that invalidates cached queries.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager owns the SSE server and keeps subscriptions in sync with qBittorrent updates.

Lock hierarchy (acquire in this order to prevent deadlock):

  1. m.mu (StreamManager.mu) - protects subscriptions, groups, loops
  2. group.mu (subscriptionGroup.mu) - protects pending queue state
  3. group.subsMu (subscriptionGroup.subsMu) - protects subscriber list

func NewStreamManager

func NewStreamManager(clientPool *qbittorrent.ClientPool, syncManager syncProvider, instanceStore *models.InstanceStore) *StreamManager

NewStreamManager constructs a manager with a configured SSE server.

func (*StreamManager) HandleMainData

func (m *StreamManager) HandleMainData(instanceID int, data *qbt.MainData)

HandleMainData implements qbittorrent.SyncEventSink.

func (*StreamManager) HandleSyncError

func (m *StreamManager) HandleSyncError(instanceID int, err error)

HandleSyncError implements qbittorrent.SyncEventSink.

func (*StreamManager) PrepareBatch

func (m *StreamManager) PrepareBatch(ctx context.Context, requests []streamRequest) (context.Context, []string, error)

PrepareBatch registers one or more subscribers and returns a context that carries their session ids.

func (*StreamManager) Serve

func (m *StreamManager) Serve(w http.ResponseWriter, r *http.Request)

Serve implements the HTTP handler for GET /stream and multiplexes multiple subscriptions over one SSE session.

func (*StreamManager) SetActivityHub

func (m *StreamManager) SetActivityHub(hub *activity.Hub)

SetActivityHub wires the qui-owned server-event hub and starts forwarding its events (plus keep-alive heartbeats) to connected SSE sessions. It must be called once during startup before the manager begins serving. A nil hub is ignored, leaving the activity channel disabled.

func (*StreamManager) Shutdown

func (m *StreamManager) Shutdown(ctx context.Context) error

func (*StreamManager) Stats

func (m *StreamManager) Stats() StreamStats

Stats returns a snapshot of current SSE activity and lifetime counters.

func (*StreamManager) Unregister

func (m *StreamManager) Unregister(id string)

Unregister removes and cleans up a subscriber when the HTTP connection closes.

type StreamMeta

type StreamMeta struct {
	InstanceID     int       `json:"instanceId"`
	RID            int64     `json:"rid,omitempty"`
	FullUpdate     bool      `json:"fullUpdate,omitempty"`
	Timestamp      time.Time `json:"timestamp"`
	RetryInSeconds int       `json:"retryInSeconds,omitempty"`
	StreamKey      string    `json:"streamKey,omitempty"`
}

StreamMeta carries lightweight metadata about the sync update.

type StreamOptions

type StreamOptions struct {
	InstanceID  int
	InstanceIDs []int
	Page        int
	Limit       int
	Sort        string
	Order       string
	Search      string
	Filters     qbittorrent.FilterOptions
}

StreamOptions captures the torrent view that the subscriber wants to keep in sync.

A subscription is single-instance when InstanceIDs is empty (keyed by InstanceID), or multi-instance (aggregated/cross-instance) when InstanceIDs holds one or more concrete instance ids. Multi-instance subscriptions are kept in sync by every one of their member instances.

type StreamPayload

type StreamPayload struct {
	Type string                       `json:"type"`
	Data *qbittorrent.TorrentResponse `json:"data,omitempty"`
	Meta *StreamMeta                  `json:"meta,omitempty"`
	Err  string                       `json:"error,omitempty"`
}

StreamPayload is the message envelope sent to the frontend.

type StreamStats

type StreamStats struct {
	ActiveSubscriptions int    // currently connected subscribers
	ActiveGroups        int    // distinct view groups being served
	ActiveSyncLoops     int    // per-instance sync loops running
	EventsPublished     uint64 // lifetime SSE messages successfully published
	EventsDropped       uint64 // lifetime messages dropped (marshal/publish failures)
	SyncErrors          uint64 // lifetime sync errors propagated to subscribers
}

StreamStats is a point-in-time snapshot of SSE subsystem activity. It is exported so the metrics layer can surface it (e.g. as Prometheus gauges/counters).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL