sessions

package
v0.0.92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package sessions tracks live playback sessions across every protocol Open-Streamer serves (HLS, DASH, RTMP, SRT, RTSP) so operators can answer "who is watching <stream> right now?".

Design rules:

  • State is in-memory only. Restart = lost; viewers reconnect → new sessions.
  • HTTP-based sessions (HLS/DASH) are keyed by a deterministic fingerprint so consecutive segment GETs from one viewer collapse onto a single record.
  • Connection-bound sessions (RTMP/SRT/RTSP) are keyed by a fresh UUID; the transport layer signals close via the Closer returned from OpenConn.
  • All open / close events are published on the event bus so analytics or persistence can be added without changing this package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HTTPMiddleware

func HTTPMiddleware(t Tracker) func(http.Handler) http.Handler

HTTPMiddleware returns a net/http middleware that records a session hit for every successful HLS / DASH manifest or segment request served from /<stream_code>/… (the URL layout owned by mediaserve.Mount).

Behaviour:

  • Stream code is parsed from the URL path's first segment. We deliberately don't rely on chi.URLParam here because chi populates the routing context AFTER the middleware chain — the middleware sees an empty param. Manual parsing matches mediaserve's "/<code>/<file>" layout.
  • Only file extensions known to mediaserve are counted: .m3u8 / .ts for HLS; .mpd / .m4s / .mp4 for DASH. Other extensions pass through untouched.
  • Bytes counted are exactly what was written to the wire — we wrap the ResponseWriter and read its byte counter.
  • Non-2xx responses still record a hit (so a broken segment URL still reveals the viewer in the dashboard) but credit zero bytes.

func TokenFromQuery

func TokenFromQuery(rawQuery string) string

TokenFromQuery sniffs an SRT-style "live/foo?token=…" or RTSP query for a `token` field. Returns "" when the query is empty / unparseable / has no token. Helper exported for OpenConn callers (publisher's SRT subscribe path) to avoid each protocol re-implementing the parse.

Types

type Closer

type Closer interface {
	Close(reason domain.SessionCloseReason, addBytes int64)
	Touch()
}

Closer ends a connection-bound session. addBytes is the cumulative bytes sent on the underlying transport; pass 0 if the protocol can't measure. Both forms are idempotent.

Touch refreshes the session's UpdatedAt so the idle reaper (which closes any session whose UpdatedAt is older than `idle_timeout_sec`, default 30s) does not mistake an actively-streaming RTMP / SRT / RTSP session for an abandoned one. Connection-bound protocols don't naturally update UpdatedAt the way HLS does (every segment GET refreshes it), so the publisher must call Touch from its data-write hot path. Throttling is the caller's responsibility — see playSession.touch.

type ConnHit

type ConnHit struct {
	StreamCode domain.StreamCode
	Protocol   domain.SessionProto // SessionProtoRTMP / SessionProtoSRT / SessionProtoRTSP
	RemoteAddr string              // ip:port from net.Conn.RemoteAddr().String()
	UserAgent  string              // RTMP flashVer / RTSP User-Agent / "" for SRT
	Token      string              // optional, e.g. SRT streamid query
	Secure     bool                // true for RTMPS / SRTS / RTSPS
}

ConnHit is the input to OpenConn for connection-bound protocols.

type Filter

type Filter struct {
	StreamCode domain.StreamCode // exact match; "" = any
	Protocol   domain.SessionProto
	Status     FilterStatus
	Limit      int // <=0 = unlimited
}

Filter is the union of selectors accepted by List. Zero value matches every session.

type FilterStatus

type FilterStatus string

FilterStatus narrows List by lifecycle state.

const (
	FilterStatusAny    FilterStatus = ""       // active + recently closed
	FilterStatusActive FilterStatus = "active" // ClosedAt == nil
	FilterStatusClosed FilterStatus = "closed" // ClosedAt != nil
)

FilterStatus values.

type GeoIPResolver

type GeoIPResolver interface {
	Country(ip net.IP) string
}

GeoIPResolver maps a remote IP to an ISO 3166-1 alpha-2 country code. Implementations must be safe for concurrent use and return "" when the address cannot be resolved (private address, missing DB, lookup error).

The default registered in DI is NullGeoIP — no lookup, Country always "". When `sessions.geoip_db_path` is set in config, the wiring layer in `cmd/server/main.go` opens the .mmdb via NewMaxMindGeoIP and registers that as the resolver instead. Operators can also swap in any custom implementation (IP2Location, in-house service) by replacing the DI binding before service start.

type HTTPHit

type HTTPHit struct {
	StreamCode domain.StreamCode
	Protocol   domain.SessionProto // SessionProtoHLS or SessionProtoDASH
	IP         string              // already extracted; no port
	UserAgent  string
	Referer    string
	Query      string // raw r.URL.RawQuery — stored only on session open
	Token      string // ?token=… value if present
	Secure     bool
	BytesDelta int64 // bytes the most recent response wrote to the client
}

HTTPHit is the input to TrackHTTP. Build with NewHTTPHit from an http.Request so the IP / UA / Referer extraction is consistent.

func NewHTTPHit

func NewHTTPHit(r *http.Request, code domain.StreamCode, proto domain.SessionProto, bytesDelta int64) HTTPHit

NewHTTPHit extracts the per-request fields TrackHTTP needs from an http.Request. Centralising this avoids per-protocol drift in how IP / UA / token are sniffed.

type MaxMindGeoIP added in v0.0.92

type MaxMindGeoIP struct {
	// contains filtered or unexported fields
}

MaxMindGeoIP wraps a MaxMind .mmdb reader and resolves IPs into ISO 3166-1 alpha-2 country codes for PlaySession.Country.

func NewMaxMindGeoIP added in v0.0.92

func NewMaxMindGeoIP(path string) (*MaxMindGeoIP, error)

NewMaxMindGeoIP opens the .mmdb file at path and returns a resolver that looks up the Country code per IP. Returns an error when the file is missing, malformed, or not a Country / City database (those are the two schemas that surface a Country.IsoCode field; ASN / ISP databases do not).

func (*MaxMindGeoIP) Close added in v0.0.92

func (m *MaxMindGeoIP) Close() error

Close releases the underlying mmap'd database file. Idempotent. Implements io.Closer so the wiring layer can call it on shutdown without type assertions; callers that hold a *MaxMindGeoIP can also defer it directly.

func (*MaxMindGeoIP) Country added in v0.0.92

func (m *MaxMindGeoIP) Country(ip net.IP) string

Country implements GeoIPResolver. Returns "" when the IP is unroutable, not in the database, or any error path the underlying lookup hits — the caller (PlaySession.Country) treats "" as "GeoIP unavailable", so silent fallback is the right behaviour. Per-call errors are logged at debug to avoid log spam under heavy session churn (closed CDN sessions can hit "address not found" thousands of times per minute).

type NullGeoIP

type NullGeoIP struct{}

NullGeoIP is a GeoIPResolver that always returns "". Used when GeoIP is disabled or no real resolver has been wired. Public so external code can reference it as an explicit "disabled" marker.

func (NullGeoIP) Country

func (NullGeoIP) Country(net.IP) string

Country implements GeoIPResolver — always returns the empty string.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the public DI handle. It implements Tracker.

func New

func New(i do.Injector) (*Service, error)

New constructs the Service for samber/do. GeoIPResolver is optional in DI — if no provider is registered, NullGeoIP is used. Metrics is also optional so unit tests that wire only sessions can construct the service.

func NewServiceForTesting

func NewServiceForTesting(cfg config.SessionsConfig, bus events.Bus, geo GeoIPResolver) *Service

NewServiceForTesting builds a Service without going through DI — for unit tests that need a real Tracker but don't want a do.Injector. Bus and resolver may be nil.

func (Service) Get

func (s Service) Get(id string) (*domain.PlaySession, bool)

Get implements Tracker.

func (Service) Kick

func (s Service) Kick(id string) bool

Kick implements Tracker.

func (Service) List

func (s Service) List(filter Filter) []*domain.PlaySession

List implements Tracker.

func (Service) OpenConn

func (s Service) OpenConn(ctx context.Context, h ConnHit) (*domain.PlaySession, Closer)

OpenConn implements Tracker.

func (*Service) Run

func (s *Service) Run(ctx context.Context)

Run starts the idle reaper. Always runs — the reaper itself checks the hot-reloadable enabled flag each tick and skips reaping when disabled. This way an operator toggling Enabled at runtime via /config takes effect without restarting the goroutine. Blocks until ctx is cancelled, then closes every still-active session with reason=shutdown so subscribers get a final event.

func (Service) Stats

func (s Service) Stats() Stats

Stats implements Tracker.

func (Service) TrackHTTP

func (s Service) TrackHTTP(ctx context.Context, h HTTPHit) *domain.PlaySession

TrackHTTP implements Tracker.

func (*Service) UpdateConfig added in v0.0.43

func (s *Service) UpdateConfig(cfg config.SessionsConfig)

UpdateConfig hot-swaps the runtime config. Called by runtime.Manager.diff when the persisted SessionsConfig section changes — no restart needed. In-flight sessions keep their state; new idle/max-lifetime windows take effect on the next reaper tick.

type Stats

type Stats struct {
	Active          int   `json:"active"`
	OpenedTotal     int64 `json:"opened_total"`
	ClosedTotal     int64 `json:"closed_total"`
	IdleClosedTotal int64 `json:"idle_closed_total"`
	KickedTotal     int64 `json:"kicked_total"`
}

Stats summarises the live tracker state.

type Tracker

type Tracker interface {
	// TrackHTTP records activity for an HTTP-based pull protocol (HLS / DASH).
	// Called once per segment / manifest GET. Idempotent on the (stream, fp)
	// pair within the idle window — repeated calls extend UpdatedAt and
	// accumulate Bytes onto an existing session instead of opening a new one.
	//
	// The returned session is a snapshot copy safe to retain by the caller;
	// further mutations happen on the live record held inside the tracker.
	TrackHTTP(ctx context.Context, h HTTPHit) *domain.PlaySession

	// OpenConn opens a session for a connection-bound protocol (RTMP / SRT /
	// RTSP). Returns the session record and a Closer the caller must invoke
	// when the underlying transport ends. The Closer is idempotent.
	OpenConn(ctx context.Context, h ConnHit) (*domain.PlaySession, Closer)

	// List returns a snapshot of every session matching the filter. Active
	// (ClosedAt == nil) and idle entries are included; filter by Status.
	List(filter Filter) []*domain.PlaySession

	// Get returns the session with the given ID, or false when missing.
	Get(id string) (*domain.PlaySession, bool)

	// Kick force-closes an active session and emits a closed event with
	// reason="kicked". Returns false when the id is unknown or already closed.
	Kick(id string) bool

	// Stats returns aggregate counters useful for /metrics or readiness
	// probes. Does not allocate per-session storage.
	Stats() Stats
}

Tracker is the public contract that publisher / api layers use to record playback sessions. The implementation is in-memory; restart drops state.

All methods are safe for concurrent use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL