streaming

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: AGPL-3.0 Imports: 37 Imported by: 0

Documentation

Overview

Package streaming provides serverless-optimized media streaming functionality.

SERVERLESS DESIGN PRINCIPLES:

  1. No Background Processes: This package avoids long-running goroutines, timers, or polling mechanisms that are incompatible with Lambda's execution model.

  2. DynamoDB TTL for Cleanup: Session and cache expiration is handled automatically by DynamoDB TTL rather than manual cleanup processes. This eliminates the need for background cleanup tasks.

  3. Stateless Operations: Each Lambda invocation operates independently without relying on shared in-memory state between invocations.

  4. Cost-Optimized: Uses DynamoDB on-demand billing and avoids unnecessary operations that would increase costs in a serverless environment.

Index

Constants

View Source
const (
	QualityAuto   = types.QualityAuto
	QualityLow    = types.QualityLow
	QualityMedium = types.QualityMedium
	QualityHigh   = types.QualityHigh
	QualitySource = types.QualitySource

	// Additional quality levels specific to streaming
	Quality4K    Quality = "4k"
	Quality1080p Quality = "1080p"
	Quality720p  Quality = "720p"
	Quality480p  Quality = "480p"
	Quality360p  Quality = "360p"
	Quality240p  Quality = "240p"
)

Re-export constants

View Source
const (
	FormatHLS    = types.FormatHLS
	FormatDASH   = types.FormatDASH
	FormatSource = types.FormatSource
)

Streaming format constants

Variables

View Source
var (
	ErrMediaMetadataNotFound    = errors.NewAppError(errors.CodeNotFound, errors.CategoryMedia, "media metadata not found")
	ErrGetMetadataFromDynamoDB  = errors.FailedToGet("metadata from DynamoDB", stdErrors.New("failed to get metadata from DynamoDB"))
	ErrCreateMetadataInDynamoDB = errors.FailedToCreate("metadata in DynamoDB", stdErrors.New("failed to create metadata in DynamoDB"))
	ErrUpdateMetadataInDynamoDB = errors.FailedToUpdate("metadata in DynamoDB", stdErrors.New("failed to update metadata in DynamoDB"))
)

Media metadata errors

View Source
var (
	ErrCheckManifestExists        = errors.ProcessingFailed("check manifest exists", stdErrors.New("failed to check manifest existence"))
	ErrSaveManifestToS3           = errors.ProcessingFailed("save manifest to S3", stdErrors.New("failed to save manifest to S3"))
	ErrGetSegmentInfo             = errors.FailedToGet("segment info", stdErrors.New("failed to get segment info"))
	ErrListSegments               = errors.FailedToList("segments", stdErrors.New("failed to list segments"))
	ErrCreateIndexForQuality      = errors.ProcessingFailed("create index for quality", stdErrors.New("failed to create index for quality"))
	ErrCreateSegmentsDirectory    = errors.ProcessingFailed("create segments directory for quality", stdErrors.New("failed to create segments directory for quality"))
	ErrCreateMasterIndex          = errors.ProcessingFailed("create master index", stdErrors.New("failed to create master index"))
	ErrGeneratePresignedUploadURL = errors.ProcessingFailed("generate presigned upload URL", stdErrors.New("failed to generate presigned upload URL"))
)

S3 operation errors

View Source
var (
	ErrCloudFrontNotConfigured              = errors.ConfigurationMissing("cloudfront_config")
	ErrInvalidCloudFrontPrivateKeyPath      = errors.ConfigurationInvalid("cloudfront_private_key_path", "invalid path")
	ErrFailedToReadCloudFrontPrivateKeyFile = errors.ProcessingFailed("CloudFront private key file read", stdErrors.New("failed to read CloudFront private key file"))
	ErrCloudFrontPrivateKeyNotProvided      = errors.ConfigurationMissing("cloudfront_private_key")
	ErrInvalidRSAPrivateKeyPEM              = errors.InvalidPrivateKeyType()
	ErrFailedToParseRSAPrivateKey           = errors.ProcessingFailed("RSA private key parsing", stdErrors.New("failed to parse RSA private key"))
)

CloudFront configuration errors

View Source
var (
	ErrFailedToGetKeyframeData    = errors.FailedToGet("keyframe data", stdErrors.New("failed to get keyframe data"))
	ErrFailedToReadIFramePlaylist = errors.ProcessingFailed("I-frame playlist read", stdErrors.New("failed to read I-frame playlist"))
	ErrFailedToReadKeyframeData   = errors.ProcessingFailed("keyframe data read", stdErrors.New("failed to read keyframe data"))
)

Keyframe data errors

View Source
var (
	ErrCreateSession          = errors.FailedToCreate("session", stdErrors.New("failed to create session"))
	ErrGetSession             = errors.FailedToGet("session", stdErrors.New("failed to get session"))
	ErrUpdateSession          = errors.FailedToUpdate("session", stdErrors.New("failed to update session"))
	ErrEndSession             = errors.ProcessingFailed("end session", stdErrors.New("failed to end session"))
	ErrQueryUserSessions      = errors.FailedToQuery("user sessions", stdErrors.New("failed to query user sessions"))
	ErrScanMediaSessions      = errors.FailedToQuery("media sessions", stdErrors.New("failed to scan media sessions"))
	ErrCleanupExpiredSessions = errors.ProcessingFailed("cleanup expired sessions", stdErrors.New("failed to cleanup expired sessions"))
)

Session management errors

Functions

func GetSegmentDurationFromPlaylist

func GetSegmentDurationFromPlaylist(playlistContent string) (float64, error)

GetSegmentDurationFromPlaylist extracts actual segment duration from a playlist

Types

type AdaptationSet

type AdaptationSet struct {
	ID               string           `xml:"id,attr"`
	MimeType         string           `xml:"mimeType,attr"`
	SegmentAlignment bool             `xml:"segmentAlignment,attr"`
	StartWithSAP     int              `xml:"startWithSAP,attr"`
	MaxWidth         int              `xml:"maxWidth,attr"`
	MaxHeight        int              `xml:"maxHeight,attr"`
	MaxFrameRate     string           `xml:"maxFrameRate,attr"`
	Par              string           `xml:"par,attr"`
	Representations  []Representation `xml:"Representation"`
}

AdaptationSet represents a set of interchangeable media components

type AdaptiveQualitySelector

type AdaptiveQualitySelector struct {
	// contains filtered or unexported fields
}

AdaptiveQualitySelector implements intelligent quality selection

func NewAdaptiveQualitySelector

func NewAdaptiveQualitySelector(logger *zap.Logger) *AdaptiveQualitySelector

NewAdaptiveQualitySelector creates a new quality selector

func (*AdaptiveQualitySelector) CleanupMetrics

func (aqs *AdaptiveQualitySelector) CleanupMetrics(maxAge time.Duration)

CleanupMetrics removes old metrics from cache

func (*AdaptiveQualitySelector) GetQualityMetrics

func (aqs *AdaptiveQualitySelector) GetQualityMetrics(sessionID string) *QualityMetrics

GetQualityMetrics retrieves metrics for a session

func (*AdaptiveQualitySelector) SelectQuality

func (aqs *AdaptiveQualitySelector) SelectQuality(bandwidth int, bufferHealth float64, availableQualities []Quality) Quality

SelectQuality chooses the optimal quality based on bandwidth and buffer health

func (*AdaptiveQualitySelector) SelectQualityWithSession

func (aqs *AdaptiveQualitySelector) SelectQualityWithSession(sessionID string, bandwidth int, bufferHealth float64, availableQualities []Quality) Quality

SelectQualityWithSession chooses the optimal quality using real session metrics

func (*AdaptiveQualitySelector) SetMetricsTracker

func (aqs *AdaptiveQualitySelector) SetMetricsTracker(metricsTracker *MetricsTracker)

SetMetricsTracker sets the metrics tracker for real-time data access

func (*AdaptiveQualitySelector) UpdateMetrics

func (aqs *AdaptiveQualitySelector) UpdateMetrics(sessionID string, rebufferEvents int, qualitySwitches int)

UpdateMetrics updates quality selection metrics

type AudioTrack

type AudioTrack struct {
	Language string
	Bitrate  int
	Codec    string
	BaseURL  string
}

AudioTrack represents an audio track

type BandwidthMeasurement

type BandwidthMeasurement struct {
	UserID    string
	Bandwidth int // in kbps
	Timestamp time.Time
}

BandwidthMeasurement represents a single bandwidth measurement

type BandwidthStats

type BandwidthStats struct {
	UserID            string
	TotalBytes        int64
	SessionBytes      int64
	AverageBandwidth  int // in kbps
	PeakBandwidth     int // in kbps
	LastMeasurement   time.Time
	MeasurementWindow time.Duration
}

BandwidthStats tracks bandwidth usage for a user

type BandwidthTracker

type BandwidthTracker struct {
	// contains filtered or unexported fields
}

BandwidthTracker tracks bandwidth usage for users

func NewBandwidthTracker

func NewBandwidthTracker(storage core.RepositoryStorage, logger *zap.Logger, costTracker CostTracker, cloudWatch *cloudwatch.Client) *BandwidthTracker

NewBandwidthTracker creates a new bandwidth tracker

func (*BandwidthTracker) GetBandwidthHistory

func (bt *BandwidthTracker) GetBandwidthHistory(ctx context.Context, userID string, duration time.Duration) ([]BandwidthMeasurement, error)

GetBandwidthHistory retrieves bandwidth measurement history from CloudWatch

func (*BandwidthTracker) GetBandwidthStats

func (bt *BandwidthTracker) GetBandwidthStats(ctx context.Context, userID string) (*BandwidthStats, error)

GetBandwidthStats retrieves bandwidth statistics for a user

func (*BandwidthTracker) GetOptimalQuality

func (bt *BandwidthTracker) GetOptimalQuality(ctx context.Context, userID string, availableBandwidth int) Quality

GetOptimalQuality determines the best quality based on user's bandwidth

func (*BandwidthTracker) RecordBandwidthMeasurement

func (bt *BandwidthTracker) RecordBandwidthMeasurement(ctx context.Context, userID string, bandwidth int) error

RecordBandwidthMeasurement records a bandwidth measurement sample with CloudWatch integration

func (*BandwidthTracker) TrackBandwidth

func (bt *BandwidthTracker) TrackBandwidth(ctx context.Context, userID string, bytesTransferred int64) error

TrackBandwidth records bandwidth usage for a user

type BaseURL

type BaseURL struct {
	Value string `xml:",chardata"`
}

BaseURL represents the base URL for media segments

type BufferHealthSample

type BufferHealthSample struct {
	Timestamp   time.Time
	BufferLevel time.Duration
	Quality     Quality
	Bandwidth   int // kbps
}

BufferHealthSample captures buffer health at a point in time

type CostTracker

type CostTracker interface {
	TrackDynamoRead(units int)
	TrackDynamoWrite(units int)
}

CostTracker interface for tracking AWS costs

type DASHGenerator

type DASHGenerator struct {
	// contains filtered or unexported fields
}

DASHGenerator handles DASH manifest generation

func NewDASHGenerator

func NewDASHGenerator(config *StreamingConfig, storage MediaStorage) *DASHGenerator

NewDASHGenerator creates a new DASH generator

func (*DASHGenerator) GenerateAudioAdaptationSet

func (g *DASHGenerator) GenerateAudioAdaptationSet(audioTracks []AudioTrack) AdaptationSet

GenerateAudioAdaptationSet creates an audio adaptation set

func (*DASHGenerator) GenerateLiveMPD

func (g *DASHGenerator) GenerateLiveMPD(mediaID string, windowSize int) (*DASHManifest, error)

GenerateLiveMPD generates a comprehensive live DASH manifest for real-time streaming

func (*DASHGenerator) GenerateLiveMPDContent

func (g *DASHGenerator) GenerateLiveMPDContent(manifest *DASHManifest) (string, error)

GenerateLiveMPDContent generates the live DASH MPD XML content

func (*DASHGenerator) GenerateLiveMPDContentWithSubtitles

func (g *DASHGenerator) GenerateLiveMPDContentWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)

GenerateLiveMPDContentWithSubtitles generates live DASH MPD content with subtitle support

func (*DASHGenerator) GenerateMPD

func (g *DASHGenerator) GenerateMPD(mediaID string, metadata *MediaMetadata) (*DASHManifest, error)

GenerateMPD generates a DASH Media Presentation Description

func (*DASHGenerator) GenerateMPDContent

func (g *DASHGenerator) GenerateMPDContent(manifest *DASHManifest) (string, error)

GenerateMPDContent generates the actual MPD XML content

func (*DASHGenerator) GenerateMPDWithSubtitles

func (g *DASHGenerator) GenerateMPDWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)

GenerateMPDWithSubtitles generates an MPD with subtitle tracks

func (*DASHGenerator) ValidateLiveManifest

func (g *DASHGenerator) ValidateLiveManifest(content string) error

ValidateLiveManifest validates a live DASH manifest for compliance

func (*DASHGenerator) ValidateMPD

func (g *DASHGenerator) ValidateMPD(content string) error

ValidateMPD validates a generated MPD

type DASHManifest

type DASHManifest struct {
	MediaID         string
	Duration        float64
	MinBufferTime   float64
	Representations []DASHRepresentation
	ManifestURL     string
	GeneratedAt     time.Time
	CacheDuration   time.Duration

	// Live streaming specific fields
	IsLive                     bool
	AvailabilityStartTime      time.Time
	PublishTime                time.Time
	TimeShiftBufferDepth       float64 // in seconds
	SuggestedPresentationDelay float64 // in seconds
	MinimumUpdatePeriod        float64 // in seconds
}

DASHManifest represents a DASH MPD (Media Presentation Description)

type DASHRepresentation

type DASHRepresentation struct {
	ID              string
	Quality         Quality
	Bandwidth       int
	Width           int
	Height          int
	Codecs          string
	BaseURL         string
	SegmentTemplate string
}

DASHRepresentation represents a quality representation in DASH

type Event

type Event struct {
	SessionID string
	UserID    string
	MediaID   string
	EventType EventType
	Timestamp time.Time

	// Event-specific data
	Quality         Quality
	PreviousQuality Quality
	Duration        time.Duration
	BytesLoaded     int64
	ErrorCode       string
	BufferHealth    float64
	Bandwidth       int
}

Event represents a streaming event for metrics tracking

type EventType

type EventType string

EventType represents types of streaming events

const (
	// EventSessionStart indicates a streaming session has started
	EventSessionStart EventType = "session_start"
	// EventQualitySwitch indicates a quality level change
	EventQualitySwitch EventType = "quality_switch"
	// EventRebufferStart indicates buffering has started
	EventRebufferStart EventType = "rebuffer_start"
	// EventRebufferEnd indicates buffering has ended
	EventRebufferEnd EventType = "rebuffer_end"
	// EventSegmentLoad indicates a segment was successfully loaded
	EventSegmentLoad EventType = "segment_load"
	// EventSegmentFail indicates a segment failed to load
	EventSegmentFail EventType = "segment_fail"
	// EventSessionEnd indicates a streaming session has ended
	EventSessionEnd EventType = "session_end"
)

Streaming event types

type HLSGenerator

type HLSGenerator struct {
	// contains filtered or unexported fields
}

HLSGenerator handles HLS manifest generation

func NewHLSGenerator

func NewHLSGenerator(config *StreamingConfig, storage MediaStorage) *HLSGenerator

NewHLSGenerator creates a new HLS generator

func (*HLSGenerator) GenerateIFrameMasterPlaylist

func (g *HLSGenerator) GenerateIFrameMasterPlaylist(manifest *HLSManifest) string

GenerateIFrameMasterPlaylist generates a master playlist with I-frame variants for trick play

func (*HLSGenerator) GenerateIFramePlaylist

func (g *HLSGenerator) GenerateIFramePlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string

GenerateIFramePlaylist generates an I-frame only playlist for trick play with precise keyframe positioning

func (*HLSGenerator) GenerateLivePlaylist

func (g *HLSGenerator) GenerateLivePlaylist(mediaID string, quality Quality, startSegment, windowSize int) string

GenerateLivePlaylist generates a live/sliding window playlist

func (*HLSGenerator) GenerateMasterPlaylist

func (g *HLSGenerator) GenerateMasterPlaylist(mediaID string, metadata *MediaMetadata) (*HLSManifest, error)

GenerateMasterPlaylist generates the HLS master playlist

func (*HLSGenerator) GenerateMasterPlaylistContent

func (g *HLSGenerator) GenerateMasterPlaylistContent(manifest *HLSManifest) string

GenerateMasterPlaylistContent generates the actual M3U8 content for the master playlist

func (*HLSGenerator) GenerateMasterPlaylistWithSubtitles

func (g *HLSGenerator) GenerateMasterPlaylistWithSubtitles(manifest *HLSManifest, subtitles []SubtitleTrack) string

GenerateMasterPlaylistWithSubtitles generates a master playlist including subtitle tracks

func (*HLSGenerator) GenerateVariantPlaylist

func (g *HLSGenerator) GenerateVariantPlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string

GenerateVariantPlaylist generates a variant playlist for a specific quality

func (*HLSGenerator) GenerateWebVTTPlaylist

func (g *HLSGenerator) GenerateWebVTTPlaylist(mediaID string, language string, segments int) string

GenerateWebVTTPlaylist generates a playlist for subtitles/captions

func (*HLSGenerator) ValidatePlaylist

func (g *HLSGenerator) ValidatePlaylist(content string) error

ValidatePlaylist performs basic validation on generated playlists

type HLSManifest

type HLSManifest struct {
	MediaID       string
	Duration      float64
	Variants      []HLSVariant
	MasterURL     string
	GeneratedAt   time.Time
	CacheDuration time.Duration
}

HLSManifest represents an HLS master playlist

type HLSVariant

type HLSVariant struct {
	Quality     Quality
	Bandwidth   int
	Resolution  string
	PlaylistURL string
	Codecs      string
}

HLSVariant represents a quality variant in HLS

type Keyframe

type Keyframe struct {
	PTS        float64 // Presentation timestamp in seconds
	ByteOffset int64   // Byte offset in the file
	ByteLength int64   // Length of the I-frame data in bytes
	Duration   float64 // Duration until next keyframe
	URI        string  // URI of the segment containing this I-frame
}

Keyframe represents an I-frame position in the media stream

type KeyframeEntry

type KeyframeEntry struct {
	PTS        float64 `json:"pts"`         // Presentation timestamp in seconds
	DTS        float64 `json:"dts"`         // Decode timestamp in seconds
	ByteOffset int64   `json:"byte_offset"` // Byte offset in the file
	ByteLength int64   `json:"byte_size"`   // Size of I-frame data in bytes
	FrameNum   int     `json:"frame_num"`   // Frame number
	Segment    int     `json:"segment"`     // Which HLS segment contains this keyframe
}

KeyframeEntry represents a single keyframe in JSON format

type KeyframeMetadata

type KeyframeMetadata struct {
	MediaID   string          `json:"media_id"`
	Quality   string          `json:"quality"`
	Keyframes []KeyframeEntry `json:"keyframes"`
	GOP       int             `json:"gop_size,omitempty"`  // Group of Pictures size
	Framerate float64         `json:"framerate,omitempty"` // Video framerate
	Duration  float64         `json:"duration,omitempty"`  // Total duration in seconds
	Codec     string          `json:"codec,omitempty"`     // Video codec (H264, H265, etc.)
}

KeyframeMetadata represents JSON keyframe metadata format

type LiveMPD

type LiveMPD struct {
	XMLName                    xml.Name `xml:"MPD"`
	XMLNS                      string   `xml:"xmlns,attr"`
	Type                       string   `xml:"type,attr"`
	AvailabilityStartTime      string   `xml:"availabilityStartTime,attr"`
	PublishTime                string   `xml:"publishTime,attr"`
	MinimumUpdatePeriod        string   `xml:"minimumUpdatePeriod,attr,omitempty"`
	TimeShiftBufferDepth       string   `xml:"timeShiftBufferDepth,attr,omitempty"`
	SuggestedPresentationDelay string   `xml:"suggestedPresentationDelay,attr,omitempty"`
	MinBufferTime              string   `xml:"minBufferTime,attr"`
	Profiles                   string   `xml:"profiles,attr"`
	Periods                    []Period `xml:"Period"`
}

LiveMPD represents a live Media Presentation Description for DASH streaming

type LiveSegmentTemplate

type LiveSegmentTemplate struct {
	Media                  string  `xml:"media,attr"`
	Initialization         string  `xml:"initialization,attr"`
	Duration               int     `xml:"duration,attr,omitempty"`
	Timescale              int     `xml:"timescale,attr"`
	AvailabilityTimeOffset float64 `xml:"availabilityTimeOffset,attr,omitempty"`
	PresentationTimeOffset int64   `xml:"presentationTimeOffset,attr,omitempty"`
	StartNumber            int     `xml:"startNumber,attr,omitempty"`
}

LiveSegmentTemplate represents a segment template for live DASH streaming

type MPD

type MPD struct {
	XMLName                   xml.Name `xml:"MPD"`
	XMLNS                     string   `xml:"xmlns,attr"`
	Type                      string   `xml:"type,attr"`
	MediaPresentationDuration string   `xml:"mediaPresentationDuration,attr"`
	MinBufferTime             string   `xml:"minBufferTime,attr"`
	Profiles                  string   `xml:"profiles,attr"`
	Periods                   []Period `xml:"Period"`
}

MPD represents a DASH Media Presentation Description document

type MediaFormat

type MediaFormat = types.MediaFormat

MediaFormat represents streaming media format

type MediaMetadata

type MediaMetadata struct {
	MediaID            string
	OriginalURL        string
	Duration           float64
	Width              int
	Height             int
	Bitrate            int
	FileSize           int64
	ProcessedAt        time.Time
	AvailableQualities []Quality
	Status             ProcessingStatus

	// Codec information for HLS/DASH manifest generation
	VideoCodec      string                       `json:"video_codec,omitempty"`      // e.g., "avc1.640028"
	AudioCodec      string                       `json:"audio_codec,omitempty"`      // e.g., "mp4a.40.2"
	VideoProfile    string                       `json:"video_profile,omitempty"`    // e.g., "High", "Main", "Baseline"
	VideoLevel      string                       `json:"video_level,omitempty"`      // e.g., "4.0", "3.1"
	QualitySettings map[Quality]QualityCodecInfo `json:"quality_settings,omitempty"` // Per-quality codec info

	// Live streaming specific fields
	IsLive    bool      `json:"is_live,omitempty"`
	StartTime time.Time `json:"start_time,omitempty"`

	// I-frame/keyframe information for trick play
	KeyframePositions []float64 `json:"keyframe_positions,omitempty"` // PTS positions of keyframes in seconds
}

MediaMetadata contains metadata about a media file

type MediaSessionRepository

type MediaSessionRepository interface {
	CreateSession(ctx context.Context, session *StreamingSession) error
	GetSession(ctx context.Context, sessionID string) (*StreamingSession, error)
	UpdateSession(ctx context.Context, session *StreamingSession) error
	EndSession(ctx context.Context, sessionID string) error
	GetUserSessions(ctx context.Context, userID string) ([]*StreamingSession, error)
	GetMediaSessions(ctx context.Context, mediaID string, limit int32) ([]*StreamingSession, error)
	CleanupExpiredSessions(ctx context.Context, maxAge time.Duration) error
	SetSessionTTL(ttl time.Duration)
}

MediaSessionRepository interface for session persistence

type MediaStorage

type MediaStorage interface {
	GetManifestPath(mediaID string, format MediaFormat, quality Quality) string
	GetSegmentPath(mediaID string, quality Quality, segmentIndex int) string
	GetMediaMetadata(mediaID string) (*MediaMetadata, error)
	ManifestExists(mediaID string, format MediaFormat) (bool, error)
	GetKeyframeData(mediaID string, quality Quality) ([]byte, error)
}

MediaStorage interface for interacting with media files

type MediaStreamer

type MediaStreamer interface {
	// Manifest generation
	GenerateHLSManifest(mediaID string) (*HLSManifest, error)
	GenerateDASHManifest(mediaID string) (*DASHManifest, error)

	// Segment management
	GetSegmentURL(mediaID string, quality Quality, segment int) (string, error)
	GetSegmentURLs(mediaID string, quality Quality, startSegment, count int) ([]string, error)

	// Bandwidth tracking
	TrackBandwidth(userID string, bytesTransferred int64) error
	GetBandwidthStats(userID string) (*BandwidthStats, error)

	// Quality selection
	GetOptimalQuality(userID string, availableBandwidth int) Quality
	GetAvailableQualities(mediaID string) ([]QualityInfo, error)

	// Session management
	StartSession(userID, mediaID string, format MediaFormat) (*StreamingSession, error)
	UpdateSession(sessionID string, quality Quality, segmentIndex int, bytesTransferred int64) error
	EndSession(sessionID string) error
	GetSession(sessionID string) (*StreamingSession, error)
}

MediaStreamer is the main interface for media streaming functionality

type MetricsTracker

type MetricsTracker struct {
	// contains filtered or unexported fields
}

MetricsTracker implements comprehensive streaming metrics tracking with CloudWatch integration

func NewMetricsTracker

func NewMetricsTracker(cloudWatch *cloudwatch.Client, logger *zap.Logger) *MetricsTracker

NewMetricsTracker creates a new metrics tracker with CloudWatch integration

func (*MetricsTracker) Cleanup

func (smt *MetricsTracker) Cleanup(maxAge time.Duration)

Cleanup removes old session metrics to prevent memory leaks

func (*MetricsTracker) EndSession

func (smt *MetricsTracker) EndSession(sessionID string) *SessionMetrics

EndSession finalizes metrics tracking for a session

func (*MetricsTracker) GetSessionMetrics

func (smt *MetricsTracker) GetSessionMetrics(sessionID string) *SessionMetrics

GetSessionMetrics retrieves current metrics for a session

func (*MetricsTracker) StartSession

func (smt *MetricsTracker) StartSession(sessionID, userID, mediaID string)

StartSession initializes metrics tracking for a new session

func (*MetricsTracker) TrackBufferHealth

func (smt *MetricsTracker) TrackBufferHealth(sessionID string, bufferLevel time.Duration, bandwidth int)

TrackBufferHealth records buffer health samples for adaptive bitrate decisions

func (*MetricsTracker) TrackQualitySwitch

func (smt *MetricsTracker) TrackQualitySwitch(sessionID string, fromQuality, toQuality Quality)

TrackQualitySwitch records a quality change event

func (*MetricsTracker) TrackRebufferEvent

func (smt *MetricsTracker) TrackRebufferEvent(sessionID string, duration time.Duration)

TrackRebufferEvent records rebuffering events

func (*MetricsTracker) TrackSegmentFailure

func (smt *MetricsTracker) TrackSegmentFailure(sessionID string, segmentIndex int, errorCode string)

TrackSegmentFailure records segment loading failures

func (*MetricsTracker) TrackSegmentLoad

func (smt *MetricsTracker) TrackSegmentLoad(sessionID string, segmentIndex int, loadTime time.Duration, bytes int64)

TrackSegmentLoad records successful segment loading

type Period

type Period struct {
	ID             string          `xml:"id,attr"`
	Start          string          `xml:"start,attr,omitempty"`
	Duration       string          `xml:"duration,attr,omitempty"`
	AdaptationSets []AdaptationSet `xml:"AdaptationSet"`
}

Period represents a period within a DASH manifest

type Preferences

type Preferences struct {
	Username                string `json:"username"`
	DeviceID                string `json:"device_id,omitempty"`
	DefaultQuality          string `json:"default_quality,omitempty"`
	AutoQuality             bool   `json:"auto_quality"`
	PreloadNext             bool   `json:"preload_next"`
	DataSaverMode           bool   `json:"data_saver_mode"`
	PreferredCodec          string `json:"preferred_codec,omitempty"`
	MaxBandwidthMbps        int64  `json:"max_bandwidth_mbps,omitempty"`
	BufferSizeSeconds       int    `json:"buffer_size_seconds,omitempty"`
	HDREnabled              bool   `json:"hdr_enabled"`
	ColorSpace              string `json:"color_space,omitempty"`
	SubtitleEnabled         bool   `json:"subtitle_enabled"`
	SubtitleLanguage        string `json:"subtitle_language,omitempty"`
	AudioDescriptionEnabled bool   `json:"audio_description_enabled"`
	ClosedCaptionsEnabled   bool   `json:"closed_captions_enabled"`
	Version                 int    `json:"version"`
	SchemaVersion           int    `json:"schema_version"`
}

Preferences represents user streaming preferences for the streamer This is a local type alias to avoid circular imports

type ProcessingStatus

type ProcessingStatus string

ProcessingStatus represents the processing status of media

const (
	StatusPending    ProcessingStatus = "pending"
	StatusProcessing ProcessingStatus = "processing"
	StatusComplete   ProcessingStatus = "complete"
	StatusFailed     ProcessingStatus = "failed"
)

Processing status constants

type Quality

type Quality = types.Quality

Quality represents video quality level

func GetQualitiesByBandwidth

func GetQualitiesByBandwidth(bandwidth int) []Quality

GetQualitiesByBandwidth returns qualities that can be supported by the given bandwidth

type QualityCodecInfo

type QualityCodecInfo struct {
	VideoCodec string `json:"video_codec"` // H.264 profile/level string like "avc1.640028"
	AudioCodec string `json:"audio_codec"` // Audio codec string like "mp4a.40.2"
	Bandwidth  int    `json:"bandwidth"`   // Required bandwidth in bps
	Width      int    `json:"width"`       // Video width in pixels
	Height     int    `json:"height"`      // Video height in pixels
}

QualityCodecInfo contains codec information for a specific quality level

type QualityInfo

type QualityInfo struct {
	Quality    Quality
	Width      int
	Height     int
	Bitrate    int    // in kbps
	Bandwidth  int    // required bandwidth in kbps
	Resolution string // e.g., "1920x1080"
}

QualityInfo contains information about a quality level

func GetQualityInfo

func GetQualityInfo(quality Quality) QualityInfo

GetQualityInfo returns quality information for a given quality level

type QualityMetrics

type QualityMetrics struct {
	SessionID         string
	AverageQuality    Quality
	QualitySwitches   int
	RebufferEvents    int
	TimeInEachQuality map[Quality]time.Duration
	LastQualityChange time.Time
}

QualityMetrics tracks quality selection metrics

type QualitySelector

type QualitySelector interface {
	SelectQuality(bandwidth int, bufferHealth float64, availableQualities []Quality) Quality
	UpdateMetrics(sessionID string, rebufferEvents int, qualitySwitches int)
	GetQualityMetrics(sessionID string) *QualityMetrics
}

QualitySelector interface for adaptive bitrate selection

type Representation

type Representation struct {
	ID              string      `xml:"id,attr"`
	Bandwidth       int         `xml:"bandwidth,attr"`
	Width           int         `xml:"width,attr"`
	Height          int         `xml:"height,attr"`
	Codecs          string      `xml:"codecs,attr"`
	BaseURL         BaseURL     `xml:"BaseURL"`
	SegmentTemplate interface{} `xml:"SegmentTemplate"`
}

Representation represents a single encoded version of the content

type S3MediaStorage

type S3MediaStorage struct {
	// contains filtered or unexported fields
}

S3MediaStorage implements MediaStorage using S3 for files and DynamORM for metadata

func NewS3MediaStorage

func NewS3MediaStorage(client *s3.Client, bucket, region string, db core.DB) *S3MediaStorage

NewS3MediaStorage creates a new S3-based media storage with DynamORM for metadata

func (*S3MediaStorage) CreateMediaStructure

func (s *S3MediaStorage) CreateMediaStructure(mediaID string, qualities []Quality) error

CreateMediaStructure creates the S3 structure for a new media item with real artifacts

func (*S3MediaStorage) GetCloudFrontDomain

func (s *S3MediaStorage) GetCloudFrontDomain() string

GetCloudFrontDomain returns the configured CloudFront domain

func (*S3MediaStorage) GetKeyframeData

func (s *S3MediaStorage) GetKeyframeData(mediaID string, quality Quality) ([]byte, error)

GetKeyframeData retrieves keyframe/I-frame data for a media item at a specific quality level

func (*S3MediaStorage) GetManifestPath

func (s *S3MediaStorage) GetManifestPath(mediaID string, format MediaFormat, quality Quality) string

GetManifestPath returns the S3 path for a manifest file

func (*S3MediaStorage) GetMediaMetadata

func (s *S3MediaStorage) GetMediaMetadata(mediaID string) (*MediaMetadata, error)

GetMediaMetadata retrieves metadata for a media item

func (*S3MediaStorage) GetPresignedUploadURL

func (s *S3MediaStorage) GetPresignedUploadURL(mediaID string, filename string) (string, error)

GetPresignedUploadURL generates a presigned URL for uploading media

func (*S3MediaStorage) GetSegmentInfo

func (s *S3MediaStorage) GetSegmentInfo(mediaID string, quality Quality, segmentIndex int) (*Segment, error)

GetSegmentInfo retrieves information about a segment

func (*S3MediaStorage) GetSegmentPath

func (s *S3MediaStorage) GetSegmentPath(mediaID string, quality Quality, segmentIndex int) string

GetSegmentPath returns the S3 path for a segment file

func (*S3MediaStorage) IsCloudFrontEnabled

func (s *S3MediaStorage) IsCloudFrontEnabled() bool

IsCloudFrontEnabled returns true if CloudFront is properly configured

func (*S3MediaStorage) ListSegments

func (s *S3MediaStorage) ListSegments(mediaID string, quality Quality) ([]*Segment, error)

ListSegments lists all segments for a quality level

func (*S3MediaStorage) ManifestExists

func (s *S3MediaStorage) ManifestExists(mediaID string, format MediaFormat) (bool, error)

ManifestExists checks if a manifest file exists

func (*S3MediaStorage) SaveManifest

func (s *S3MediaStorage) SaveManifest(mediaID string, format MediaFormat, quality Quality, content string) error

SaveManifest saves a manifest to S3

func (*S3MediaStorage) UpdateMediaMetadata

func (s *S3MediaStorage) UpdateMediaMetadata(mediaID string, metadata *MediaMetadata) error

UpdateMediaMetadata updates the metadata for a media item

type Segment

type Segment struct {
	Index    int
	Duration float64 // in seconds
	URL      string
	Size     int64 // in bytes
}

Segment represents a media segment

type SegmentTemplate

type SegmentTemplate struct {
	Media          string `xml:"media,attr"`
	Initialization string `xml:"initialization,attr"`
	StartNumber    int    `xml:"startNumber,attr"`
	Duration       int    `xml:"duration,attr"`
	Timescale      int    `xml:"timescale,attr"`
}

SegmentTemplate defines how media segments are addressed

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages streaming sessions Uses DynamoDB TTL for automatic session expiration in serverless environments

func NewSessionManager

func NewSessionManager(repo MediaSessionRepository, logger *zap.Logger, costTracker CostTracker) *SessionManager

NewSessionManager creates a new session manager

func (*SessionManager) CleanupExpiredSessions

func (sm *SessionManager) CleanupExpiredSessions(ctx context.Context, maxAge time.Duration) error

CleanupExpiredSessions removes sessions older than the specified duration NOTE: This method is optional in serverless environments. DynamoDB TTL will automatically remove expired sessions. This method can be used for immediate cleanup if needed, but incurs additional DynamoDB scan costs.

func (*SessionManager) CreateSession

func (sm *SessionManager) CreateSession(ctx context.Context, session *StreamingSession) error

CreateSession creates a new streaming session

func (*SessionManager) EndSession

func (sm *SessionManager) EndSession(ctx context.Context, sessionID string) error

EndSession marks a session as ended

func (*SessionManager) GetMediaSessions

func (sm *SessionManager) GetMediaSessions(ctx context.Context, mediaID string, limit int32) ([]*StreamingSession, error)

GetMediaSessions retrieves sessions for a specific media item

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*StreamingSession, error)

GetSession retrieves a streaming session

func (*SessionManager) GetUserSessions

func (sm *SessionManager) GetUserSessions(ctx context.Context, userID string) ([]*StreamingSession, error)

GetUserSessions retrieves active sessions for a user

func (*SessionManager) SetSessionTTL

func (sm *SessionManager) SetSessionTTL(ttl time.Duration)

SetSessionTTL configures the TTL for streaming sessions Common values: - 6 hours for media streaming sessions - 24 hours for analytics data - 7 days for historical metrics

Example usage:

sm.SetSessionTTL(6 * time.Hour)  // Short-lived media sessions
sm.SetSessionTTL(24 * time.Hour) // Analytics data (default)
sm.SetSessionTTL(7 * 24 * time.Hour) // Historical metrics

func (*SessionManager) UpdateSession

func (sm *SessionManager) UpdateSession(ctx context.Context, session *StreamingSession) error

UpdateSession updates a streaming session

type SessionMetrics

type SessionMetrics struct {
	SessionID  string
	UserID     string
	MediaID    string
	StartTime  time.Time
	LastUpdate time.Time

	// Quality tracking
	CurrentQuality    Quality
	QualitySwitches   int
	TimeInEachQuality map[Quality]time.Duration
	LastQualityChange time.Time

	// Rebuffer tracking
	RebufferEvents    int
	TotalRebufferTime time.Duration
	LastRebufferTime  time.Time

	// Performance metrics
	StartupTime        time.Duration
	SegmentSuccessRate float64
	TotalSegments      int
	FailedSegments     int
	BytesTransferred   int64

	// Buffer health history
	BufferHealthHistory []BufferHealthSample

	// QoE calculation
	QoEScore      float64
	LastQoEUpdate time.Time
}

SessionMetrics tracks detailed metrics for a streaming session

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer implements the MediaStreamer interface

func NewStreamer

func NewStreamer(
	config *StreamingConfig,
	analytics core.RepositoryStorage,
	s3Client *s3.Client,
	cloudWatch *cloudwatch.Client,
	db dynamormCore.DB,
	logger *zap.Logger,
	costTracker CostTracker,
) *Streamer

NewStreamer creates a new media streamer

func (*Streamer) EndSession

func (s *Streamer) EndSession(sessionID string) error

EndSession ends a streaming session

func (*Streamer) GenerateDASHManifest

func (s *Streamer) GenerateDASHManifest(mediaID string) (*DASHManifest, error)

GenerateDASHManifest generates a DASH manifest for a media item

func (*Streamer) GenerateHLSManifest

func (s *Streamer) GenerateHLSManifest(mediaID string) (*HLSManifest, error)

GenerateHLSManifest generates an HLS manifest for a media item

func (*Streamer) GetAvailableQualities

func (s *Streamer) GetAvailableQualities(mediaID string) ([]QualityInfo, error)

GetAvailableQualities returns available qualities for a media item

func (*Streamer) GetAvailableQualitiesForUser

func (s *Streamer) GetAvailableQualitiesForUser(mediaID, userID string) ([]QualityInfo, error)

GetAvailableQualitiesForUser returns available qualities for a media item filtered by user preferences

func (*Streamer) GetBandwidthStats

func (s *Streamer) GetBandwidthStats(userID string) (*BandwidthStats, error)

GetBandwidthStats retrieves bandwidth statistics

func (*Streamer) GetOptimalQuality

func (s *Streamer) GetOptimalQuality(userID string, availableBandwidth int) Quality

GetOptimalQuality determines the best quality for a user based on preferences and bandwidth

func (*Streamer) GetOptimalQualityWithPreferences

func (s *Streamer) GetOptimalQualityWithPreferences(userID string, availableBandwidth int, _ string, userPreferences *Preferences) Quality

GetOptimalQualityWithPreferences determines the best quality considering user preferences

func (*Streamer) GetSegmentURL

func (s *Streamer) GetSegmentURL(mediaID string, quality Quality, segment int) (string, error)

GetSegmentURL returns the URL for a specific segment

func (*Streamer) GetSegmentURLs

func (s *Streamer) GetSegmentURLs(mediaID string, quality Quality, startSegment, count int) ([]string, error)

GetSegmentURLs returns URLs for multiple segments

func (*Streamer) GetSession

func (s *Streamer) GetSession(sessionID string) (*StreamingSession, error)

GetSession retrieves a streaming session

func (*Streamer) SetSessionManager

func (s *Streamer) SetSessionManager(sessionManager *SessionManager)

SetSessionManager sets the session manager (for dependency injection)

func (*Streamer) StartSession

func (s *Streamer) StartSession(userID, mediaID string, format MediaFormat) (*StreamingSession, error)

StartSession starts a new streaming session Sessions are automatically cleaned up after 24 hours using DynamoDB TTL

func (*Streamer) StartSessionWithPreferences

func (s *Streamer) StartSessionWithPreferences(userID, mediaID string, format MediaFormat, _ string, userPreferences *Preferences) (*StreamingSession, error)

StartSessionWithPreferences starts a new streaming session with user preferences

func (*Streamer) TrackBandwidth

func (s *Streamer) TrackBandwidth(userID string, bytesTransferred int64) error

TrackBandwidth records bandwidth usage

func (*Streamer) UpdateSession

func (s *Streamer) UpdateSession(sessionID string, quality Quality, segmentIndex int, bytesTransferred int64) error

UpdateSession updates an active session

type StreamingConfig

type StreamingConfig struct {
	CDNBaseURL         string
	S3Bucket           string
	S3Region           string
	SegmentDuration    int // in seconds
	ManifestCacheTTL   time.Duration
	MaxConcurrentJobs  int
	EnableCostTracking bool
	DefaultQuality     Quality

	// Bandwidth thresholds for quality selection (in kbps)
	Bandwidth4K    int
	Bandwidth1080p int
	Bandwidth720p  int
	Bandwidth480p  int
	Bandwidth360p  int
	Bandwidth240p  int
}

StreamingConfig holds configuration for the streaming service

type StreamingError

type StreamingError struct {
	Code    string
	Message string
	MediaID string
	Details map[string]any
}

StreamingError represents an error in streaming operations

func (*StreamingError) Error

func (e *StreamingError) Error() string

type StreamingSession

type StreamingSession = types.StreamingSession

StreamingSession represents an active streaming session

type SubtitleTrack

type SubtitleTrack struct {
	Name       string
	Language   string
	URI        string
	Default    bool
	AutoSelect bool
	Forced     bool
}

SubtitleTrack represents a subtitle/caption track

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL