Documentation
¶
Overview ¶
Package sessions tracks live playback sessions across every protocol Open-Streamer serves (HLS, DASH, RTMP, SRT, RTSP) so operators can answer "who is watching <stream> right now?".
Design rules:
- State is in-memory only. Restart = lost; viewers reconnect → new sessions.
- HTTP-based sessions (HLS/DASH) are keyed by a deterministic fingerprint so consecutive segment GETs from one viewer collapse onto a single record.
- Connection-bound sessions (RTMP/SRT/RTSP) are keyed by a fresh UUID; the transport layer signals close via the Closer returned from OpenConn.
- All open / close events are published on the event bus so analytics or persistence can be added without changing this package.
Index ¶
- func HTTPMiddleware(t Tracker) func(http.Handler) http.Handler
- func TokenFromQuery(rawQuery string) string
- type Closer
- type ConnHit
- type Filter
- type FilterStatus
- type GeoIPResolver
- type HTTPHit
- type MaxMindGeoIP
- type NullGeoIP
- type Service
- func (s Service) Get(id string) (*domain.PlaySession, bool)
- func (s Service) Kick(id string) bool
- func (s Service) List(filter Filter) []*domain.PlaySession
- func (s Service) OpenConn(ctx context.Context, h ConnHit) (*domain.PlaySession, Closer)
- func (s *Service) Run(ctx context.Context)
- func (s Service) Stats() Stats
- func (s Service) TrackHTTP(ctx context.Context, h HTTPHit) *domain.PlaySession
- func (s *Service) UpdateConfig(cfg config.SessionsConfig)
- type Stats
- type Tracker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HTTPMiddleware ¶
HTTPMiddleware returns a net/http middleware that records a session hit for every successful HLS / DASH manifest or segment request served from /<stream_code>/… (the URL layout owned by mediaserve.Mount).
Behaviour:
- Stream code is parsed from the URL path's first segment. We deliberately don't rely on chi.URLParam here because chi populates the routing context AFTER the middleware chain — the middleware sees an empty param. Manual parsing matches mediaserve's "/<code>/<file>" layout.
- Only file extensions known to mediaserve are counted: .m3u8 / .ts for HLS; .mpd / .m4s / .mp4 for DASH. Other extensions pass through untouched.
- Bytes counted are exactly what was written to the wire — we wrap the ResponseWriter and read its byte counter.
- Non-2xx responses still record a hit (so a broken segment URL still reveals the viewer in the dashboard) but credit zero bytes.
func TokenFromQuery ¶
TokenFromQuery sniffs an SRT-style "live/foo?token=…" or RTSP query for a `token` field. Returns "" when the query is empty / unparseable / has no token. Helper exported for OpenConn callers (publisher's SRT subscribe path) to avoid each protocol re-implementing the parse.
Types ¶
type Closer ¶
type Closer interface {
Close(reason domain.SessionCloseReason, addBytes int64)
Touch(addBytes int64)
}
Closer ends a connection-bound session. addBytes is the cumulative bytes sent on the underlying transport; pass 0 if the protocol can't measure. Both forms are idempotent.
Touch refreshes the session's UpdatedAt so the idle reaper (which closes any session whose UpdatedAt is older than `idle_timeout_sec`, default 30s) does not mistake an actively-streaming RTMP / SRT / RTSP session for an abandoned one. Connection-bound protocols don't naturally update UpdatedAt the way HLS does (every segment GET refreshes it), so the publisher must call Touch from its data-write hot path. Throttling is the caller's responsibility — see playSession.touch.
addBytes credits an incremental byte delta to the live session record so the API surfaces a growing counter mid-stream (RTMP / SRT / MPEGTS); pass 0 when the protocol can only measure the total at close time (RTSP — gortsplib serialises packets internally).
type ConnHit ¶
type ConnHit struct {
StreamCode domain.StreamCode
Protocol domain.SessionProto // SessionProtoRTMP / SessionProtoSRT / SessionProtoRTSP
RemoteAddr string // ip:port from net.Conn.RemoteAddr().String()
UserAgent string // RTMP flashVer / RTSP User-Agent / "" for SRT
Token string // optional, e.g. SRT streamid query
Secure bool // true for RTMPS / SRTS / RTSPS
}
ConnHit is the input to OpenConn for connection-bound protocols.
type Filter ¶
type Filter struct {
StreamCode domain.StreamCode // exact match; "" = any
Protocol domain.SessionProto
Status FilterStatus
Limit int // <=0 = unlimited
}
Filter is the union of selectors accepted by List. Zero value matches every session.
type FilterStatus ¶
type FilterStatus string
FilterStatus narrows List by lifecycle state.
const ( FilterStatusAny FilterStatus = "" // active + recently closed FilterStatusActive FilterStatus = "active" // ClosedAt == nil FilterStatusClosed FilterStatus = "closed" // ClosedAt != nil )
FilterStatus values.
type GeoIPResolver ¶
GeoIPResolver maps a remote IP to an ISO 3166-1 alpha-2 country code. Implementations must be safe for concurrent use and return "" when the address cannot be resolved (private address, missing DB, lookup error).
The default registered in DI is NullGeoIP — no lookup, Country always "". When `sessions.geoip_db_path` is set in config, the wiring layer in `cmd/server/main.go` opens the .mmdb via NewMaxMindGeoIP and registers that as the resolver instead. Operators can also swap in any custom implementation (IP2Location, in-house service) by replacing the DI binding before service start.
type HTTPHit ¶
type HTTPHit struct {
StreamCode domain.StreamCode
Protocol domain.SessionProto // SessionProtoHLS or SessionProtoDASH
IP string // already extracted; no port
UserAgent string
Referer string
Query string // raw r.URL.RawQuery — stored only on session open
Token string // ?token=… value if present
Secure bool
BytesDelta int64 // bytes the most recent response wrote to the client
}
HTTPHit is the input to TrackHTTP. Build with NewHTTPHit from an http.Request so the IP / UA / Referer extraction is consistent.
func NewHTTPHit ¶
func NewHTTPHit(r *http.Request, code domain.StreamCode, proto domain.SessionProto, bytesDelta int64) HTTPHit
NewHTTPHit extracts the per-request fields TrackHTTP needs from an http.Request. Centralising this avoids per-protocol drift in how IP / UA / token are sniffed.
type MaxMindGeoIP ¶ added in v0.0.92
type MaxMindGeoIP struct {
// contains filtered or unexported fields
}
MaxMindGeoIP wraps a MaxMind .mmdb reader and resolves IPs into ISO 3166-1 alpha-2 country codes for PlaySession.Country.
func NewMaxMindGeoIP ¶ added in v0.0.92
func NewMaxMindGeoIP(path string) (*MaxMindGeoIP, error)
NewMaxMindGeoIP opens the .mmdb file at path and returns a resolver that looks up the Country code per IP. Returns an error when the file is missing, malformed, or not a Country / City database (those are the two schemas that surface a Country.IsoCode field; ASN / ISP databases do not).
func (*MaxMindGeoIP) Close ¶ added in v0.0.92
func (m *MaxMindGeoIP) Close() error
Close releases the underlying mmap'd database file. Idempotent. Implements io.Closer so the wiring layer can call it on shutdown without type assertions; callers that hold a *MaxMindGeoIP can also defer it directly.
func (*MaxMindGeoIP) Country ¶ added in v0.0.92
func (m *MaxMindGeoIP) Country(ip net.IP) string
Country implements GeoIPResolver. Returns "" when the IP is unroutable, not in the database, or any error path the underlying lookup hits — the caller (PlaySession.Country) treats "" as "GeoIP unavailable", so silent fallback is the right behaviour. Per-call errors are logged at debug to avoid log spam under heavy session churn (closed CDN sessions can hit "address not found" thousands of times per minute).
type NullGeoIP ¶
type NullGeoIP struct{}
NullGeoIP is a GeoIPResolver that always returns "". Used when GeoIP is disabled or no real resolver has been wired. Public so external code can reference it as an explicit "disabled" marker.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the public DI handle. It implements Tracker.
func New ¶
New constructs the Service for samber/do. GeoIPResolver is optional in DI — if no provider is registered, NullGeoIP is used. Metrics is also optional so unit tests that wire only sessions can construct the service.
func NewServiceForTesting ¶
func NewServiceForTesting(cfg config.SessionsConfig, bus events.Bus, geo GeoIPResolver) *Service
NewServiceForTesting builds a Service without going through DI — for unit tests that need a real Tracker but don't want a do.Injector. Bus and resolver may be nil.
func (Service) Get ¶
func (s Service) Get(id string) (*domain.PlaySession, bool)
Get implements Tracker.
func (Service) List ¶
func (s Service) List(filter Filter) []*domain.PlaySession
List implements Tracker.
Results are sorted by OpenedAt ascending (oldest session first) so that repeated polls from the dashboard return rows in a stable order — the underlying map iteration is randomised by Go and would make the UI reshuffle on every refresh. Sort happens before Limit so a paged caller always sees the same window for a given snapshot of state.
Tie-break by ID keeps two sessions opened in the same nanosecond deterministic across calls (rare in production, common in tests where the clock is faked).
func (*Service) Run ¶
Run starts the idle reaper. Always runs — the reaper itself checks the hot-reloadable enabled flag each tick and skips reaping when disabled. This way an operator toggling Enabled at runtime via /config takes effect without restarting the goroutine. Blocks until ctx is cancelled, then closes every still-active session with reason=shutdown so subscribers get a final event.
func (Service) TrackHTTP ¶
func (s Service) TrackHTTP(ctx context.Context, h HTTPHit) *domain.PlaySession
TrackHTTP implements Tracker.
func (*Service) UpdateConfig ¶ added in v0.0.43
func (s *Service) UpdateConfig(cfg config.SessionsConfig)
UpdateConfig hot-swaps the runtime config. Called by runtime.Manager.diff when the persisted SessionsConfig section changes — no restart needed. In-flight sessions keep their state; new idle/max-lifetime windows take effect on the next reaper tick.
type Stats ¶
type Stats struct {
Active int `json:"active"`
OpenedTotal int64 `json:"opened_total"`
ClosedTotal int64 `json:"closed_total"`
IdleClosedTotal int64 `json:"idle_closed_total"`
KickedTotal int64 `json:"kicked_total"`
}
Stats summarises the live tracker state.
type Tracker ¶
type Tracker interface {
// TrackHTTP records activity for an HTTP-based pull protocol (HLS / DASH).
// Called once per segment / manifest GET. Idempotent on the (stream, fp)
// pair within the idle window — repeated calls extend UpdatedAt and
// accumulate Bytes onto an existing session instead of opening a new one.
//
// The returned session is a snapshot copy safe to retain by the caller;
// further mutations happen on the live record held inside the tracker.
TrackHTTP(ctx context.Context, h HTTPHit) *domain.PlaySession
// OpenConn opens a session for a connection-bound protocol (RTMP / SRT /
// RTSP). Returns the session record and a Closer the caller must invoke
// when the underlying transport ends. The Closer is idempotent.
OpenConn(ctx context.Context, h ConnHit) (*domain.PlaySession, Closer)
// List returns a snapshot of every session matching the filter. Active
// (ClosedAt == nil) and idle entries are included; filter by Status.
List(filter Filter) []*domain.PlaySession
// Get returns the session with the given ID, or false when missing.
Get(id string) (*domain.PlaySession, bool)
// Kick force-closes an active session and emits a closed event with
// reason="kicked". Returns false when the id is unknown or already closed.
Kick(id string) bool
// Stats returns aggregate counters useful for /metrics or readiness
// probes. Does not allocate per-session storage.
Stats() Stats
}
Tracker is the public contract that publisher / api layers use to record playback sessions. The implementation is in-memory; restart drops state.
All methods are safe for concurrent use.