Documentation
¶
Index ¶
- type ActivityPayload
- type StreamManager
- func (m *StreamManager) HandleMainData(instanceID int, data *qbt.MainData)
- func (m *StreamManager) HandleSyncError(instanceID int, err error)
- func (m *StreamManager) PrepareBatch(ctx context.Context, requests []streamRequest) (context.Context, []string, error)
- func (m *StreamManager) Serve(w http.ResponseWriter, r *http.Request)
- func (m *StreamManager) SetActivityHub(hub *activity.Hub)
- func (m *StreamManager) Shutdown(ctx context.Context) error
- func (m *StreamManager) Stats() StreamStats
- func (m *StreamManager) Unregister(id string)
- type StreamMeta
- type StreamOptions
- type StreamPayload
- type StreamStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivityPayload ¶
type ActivityPayload struct {
Type string `json:"type"`
Activity *activity.Event `json:"activity,omitempty"`
}
ActivityPayload is the message envelope for qui-owned server activity events. It is intentionally distinct from StreamPayload (whose Data is a torrent response) so the frontend's torrent-stream router never sees activity events: they are delivered as a separate named "activity" SSE event with their own handler that invalidates cached queries.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager owns the SSE server and keeps subscriptions in sync with qBittorrent updates.
Lock hierarchy (acquire in this order to prevent deadlock):
- m.mu (StreamManager.mu) - protects subscriptions, groups, loops
- group.mu (subscriptionGroup.mu) - protects pending queue state
- group.subsMu (subscriptionGroup.subsMu) - protects subscriber list
func NewStreamManager ¶
func NewStreamManager(clientPool *qbittorrent.ClientPool, syncManager syncProvider, instanceStore *models.InstanceStore) *StreamManager
NewStreamManager constructs a manager with a configured SSE server.
func (*StreamManager) HandleMainData ¶
func (m *StreamManager) HandleMainData(instanceID int, data *qbt.MainData)
HandleMainData implements qbittorrent.SyncEventSink.
func (*StreamManager) HandleSyncError ¶
func (m *StreamManager) HandleSyncError(instanceID int, err error)
HandleSyncError implements qbittorrent.SyncEventSink.
func (*StreamManager) PrepareBatch ¶
func (m *StreamManager) PrepareBatch(ctx context.Context, requests []streamRequest) (context.Context, []string, error)
PrepareBatch registers one or more subscribers and returns a context that carries their session ids.
func (*StreamManager) Serve ¶
func (m *StreamManager) Serve(w http.ResponseWriter, r *http.Request)
Serve implements the HTTP handler for GET /stream and multiplexes multiple subscriptions over one SSE session.
func (*StreamManager) SetActivityHub ¶
func (m *StreamManager) SetActivityHub(hub *activity.Hub)
SetActivityHub wires the qui-owned server-event hub and starts forwarding its events (plus keep-alive heartbeats) to connected SSE sessions. It must be called once during startup before the manager begins serving. A nil hub is ignored, leaving the activity channel disabled.
func (*StreamManager) Stats ¶
func (m *StreamManager) Stats() StreamStats
Stats returns a snapshot of current SSE activity and lifetime counters.
func (*StreamManager) Unregister ¶
func (m *StreamManager) Unregister(id string)
Unregister removes and cleans up a subscriber when the HTTP connection closes.
type StreamMeta ¶
type StreamMeta struct {
InstanceID int `json:"instanceId"`
RID int64 `json:"rid,omitempty"`
FullUpdate bool `json:"fullUpdate,omitempty"`
Timestamp time.Time `json:"timestamp"`
RetryInSeconds int `json:"retryInSeconds,omitempty"`
StreamKey string `json:"streamKey,omitempty"`
}
StreamMeta carries lightweight metadata about the sync update.
type StreamOptions ¶
type StreamOptions struct {
InstanceID int
InstanceIDs []int
Page int
Limit int
Sort string
Order string
Search string
Filters qbittorrent.FilterOptions
}
StreamOptions captures the torrent view that the subscriber wants to keep in sync.
A subscription is single-instance when InstanceIDs is empty (keyed by InstanceID), or multi-instance (aggregated/cross-instance) when InstanceIDs holds one or more concrete instance ids. Multi-instance subscriptions are kept in sync by every one of their member instances.
type StreamPayload ¶
type StreamPayload struct {
Type string `json:"type"`
Data *qbittorrent.TorrentResponse `json:"data,omitempty"`
Meta *StreamMeta `json:"meta,omitempty"`
Err string `json:"error,omitempty"`
}
StreamPayload is the message envelope sent to the frontend.
type StreamStats ¶
type StreamStats struct {
ActiveSubscriptions int // currently connected subscribers
ActiveGroups int // distinct view groups being served
ActiveSyncLoops int // per-instance sync loops running
EventsPublished uint64 // lifetime SSE messages successfully published
EventsDropped uint64 // lifetime messages dropped (marshal/publish failures)
SyncErrors uint64 // lifetime sync errors propagated to subscribers
}
StreamStats is a point-in-time snapshot of SSE subsystem activity. It is exported so the metrics layer can surface it (e.g. as Prometheus gauges/counters).