Documentation
¶
Overview ¶
Package streaming provides serverless-optimized media streaming functionality.
SERVERLESS DESIGN PRINCIPLES:
No Background Processes: This package avoids long-running goroutines, timers, or polling mechanisms that are incompatible with Lambda's execution model.
DynamoDB TTL for Cleanup: Session and cache expiration is handled automatically by DynamoDB TTL rather than manual cleanup processes. This eliminates the need for background cleanup tasks.
Stateless Operations: Each Lambda invocation operates independently without relying on shared in-memory state between invocations.
Cost-Optimized: Uses DynamoDB on-demand billing and avoids unnecessary operations that would increase costs in a serverless environment.
Index ¶
- Constants
- Variables
- func GetSegmentDurationFromPlaylist(playlistContent string) (float64, error)
- type AdaptationSet
- type AdaptiveQualitySelector
- func (aqs *AdaptiveQualitySelector) CleanupMetrics(maxAge time.Duration)
- func (aqs *AdaptiveQualitySelector) GetQualityMetrics(sessionID string) *QualityMetrics
- func (aqs *AdaptiveQualitySelector) SelectQuality(bandwidth int, bufferHealth float64, availableQualities []Quality) Quality
- func (aqs *AdaptiveQualitySelector) SelectQualityWithSession(sessionID string, bandwidth int, bufferHealth float64, ...) Quality
- func (aqs *AdaptiveQualitySelector) SetMetricsTracker(metricsTracker *MetricsTracker)
- func (aqs *AdaptiveQualitySelector) UpdateMetrics(sessionID string, rebufferEvents int, qualitySwitches int)
- type AudioTrack
- type BandwidthMeasurement
- type BandwidthStats
- type BandwidthTracker
- func (bt *BandwidthTracker) GetBandwidthHistory(ctx context.Context, userID string, duration time.Duration) ([]BandwidthMeasurement, error)
- func (bt *BandwidthTracker) GetBandwidthStats(ctx context.Context, userID string) (*BandwidthStats, error)
- func (bt *BandwidthTracker) GetOptimalQuality(ctx context.Context, userID string, availableBandwidth int) Quality
- func (bt *BandwidthTracker) RecordBandwidthMeasurement(ctx context.Context, userID string, bandwidth int) error
- func (bt *BandwidthTracker) TrackBandwidth(ctx context.Context, userID string, bytesTransferred int64) error
- type BaseURL
- type BufferHealthSample
- type CostTracker
- type DASHGenerator
- func (g *DASHGenerator) GenerateAudioAdaptationSet(audioTracks []AudioTrack) AdaptationSet
- func (g *DASHGenerator) GenerateLiveMPD(mediaID string, windowSize int) (*DASHManifest, error)
- func (g *DASHGenerator) GenerateLiveMPDContent(manifest *DASHManifest) (string, error)
- func (g *DASHGenerator) GenerateLiveMPDContentWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)
- func (g *DASHGenerator) GenerateMPD(mediaID string, metadata *MediaMetadata) (*DASHManifest, error)
- func (g *DASHGenerator) GenerateMPDContent(manifest *DASHManifest) (string, error)
- func (g *DASHGenerator) GenerateMPDWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)
- func (g *DASHGenerator) ValidateLiveManifest(content string) error
- func (g *DASHGenerator) ValidateMPD(content string) error
- type DASHManifest
- type DASHRepresentation
- type Event
- type EventType
- type HLSGenerator
- func (g *HLSGenerator) GenerateIFrameMasterPlaylist(manifest *HLSManifest) string
- func (g *HLSGenerator) GenerateIFramePlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string
- func (g *HLSGenerator) GenerateLivePlaylist(mediaID string, quality Quality, startSegment, windowSize int) string
- func (g *HLSGenerator) GenerateMasterPlaylist(mediaID string, metadata *MediaMetadata) (*HLSManifest, error)
- func (g *HLSGenerator) GenerateMasterPlaylistContent(manifest *HLSManifest) string
- func (g *HLSGenerator) GenerateMasterPlaylistWithSubtitles(manifest *HLSManifest, subtitles []SubtitleTrack) string
- func (g *HLSGenerator) GenerateVariantPlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string
- func (g *HLSGenerator) GenerateWebVTTPlaylist(mediaID string, language string, segments int) string
- func (g *HLSGenerator) ValidatePlaylist(content string) error
- type HLSManifest
- type HLSVariant
- type Keyframe
- type KeyframeEntry
- type KeyframeMetadata
- type LiveMPD
- type LiveSegmentTemplate
- type MPD
- type MediaFormat
- type MediaMetadata
- type MediaSessionRepository
- type MediaStorage
- type MediaStreamer
- type MetricsTracker
- func (smt *MetricsTracker) Cleanup(maxAge time.Duration)
- func (smt *MetricsTracker) EndSession(sessionID string) *SessionMetrics
- func (smt *MetricsTracker) GetSessionMetrics(sessionID string) *SessionMetrics
- func (smt *MetricsTracker) StartSession(sessionID, userID, mediaID string)
- func (smt *MetricsTracker) TrackBufferHealth(sessionID string, bufferLevel time.Duration, bandwidth int)
- func (smt *MetricsTracker) TrackQualitySwitch(sessionID string, fromQuality, toQuality Quality)
- func (smt *MetricsTracker) TrackRebufferEvent(sessionID string, duration time.Duration)
- func (smt *MetricsTracker) TrackSegmentFailure(sessionID string, segmentIndex int, errorCode string)
- func (smt *MetricsTracker) TrackSegmentLoad(sessionID string, segmentIndex int, loadTime time.Duration, bytes int64)
- type Period
- type Preferences
- type ProcessingStatus
- type Quality
- type QualityCodecInfo
- type QualityInfo
- type QualityMetrics
- type QualitySelector
- type Representation
- type S3MediaStorage
- func (s *S3MediaStorage) CreateMediaStructure(mediaID string, qualities []Quality) error
- func (s *S3MediaStorage) GetCloudFrontDomain() string
- func (s *S3MediaStorage) GetKeyframeData(mediaID string, quality Quality) ([]byte, error)
- func (s *S3MediaStorage) GetManifestPath(mediaID string, format MediaFormat, quality Quality) string
- func (s *S3MediaStorage) GetMediaMetadata(mediaID string) (*MediaMetadata, error)
- func (s *S3MediaStorage) GetPresignedUploadURL(mediaID string, filename string) (string, error)
- func (s *S3MediaStorage) GetSegmentInfo(mediaID string, quality Quality, segmentIndex int) (*Segment, error)
- func (s *S3MediaStorage) GetSegmentPath(mediaID string, quality Quality, segmentIndex int) string
- func (s *S3MediaStorage) IsCloudFrontEnabled() bool
- func (s *S3MediaStorage) ListSegments(mediaID string, quality Quality) ([]*Segment, error)
- func (s *S3MediaStorage) ManifestExists(mediaID string, format MediaFormat) (bool, error)
- func (s *S3MediaStorage) SaveManifest(mediaID string, format MediaFormat, quality Quality, content string) error
- func (s *S3MediaStorage) UpdateMediaMetadata(mediaID string, metadata *MediaMetadata) error
- type Segment
- type SegmentTemplate
- type SessionManager
- func (sm *SessionManager) CleanupExpiredSessions(ctx context.Context, maxAge time.Duration) error
- func (sm *SessionManager) CreateSession(ctx context.Context, session *StreamingSession) error
- func (sm *SessionManager) EndSession(ctx context.Context, sessionID string) error
- func (sm *SessionManager) GetMediaSessions(ctx context.Context, mediaID string, limit int32) ([]*StreamingSession, error)
- func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*StreamingSession, error)
- func (sm *SessionManager) GetUserSessions(ctx context.Context, userID string) ([]*StreamingSession, error)
- func (sm *SessionManager) SetSessionTTL(ttl time.Duration)
- func (sm *SessionManager) UpdateSession(ctx context.Context, session *StreamingSession) error
- type SessionMetrics
- type Streamer
- func (s *Streamer) EndSession(sessionID string) error
- func (s *Streamer) GenerateDASHManifest(mediaID string) (*DASHManifest, error)
- func (s *Streamer) GenerateHLSManifest(mediaID string) (*HLSManifest, error)
- func (s *Streamer) GetAvailableQualities(mediaID string) ([]QualityInfo, error)
- func (s *Streamer) GetAvailableQualitiesForUser(mediaID, userID string) ([]QualityInfo, error)
- func (s *Streamer) GetBandwidthStats(userID string) (*BandwidthStats, error)
- func (s *Streamer) GetOptimalQuality(userID string, availableBandwidth int) Quality
- func (s *Streamer) GetOptimalQualityWithPreferences(userID string, availableBandwidth int, _ string, userPreferences *Preferences) Quality
- func (s *Streamer) GetSegmentURL(mediaID string, quality Quality, segment int) (string, error)
- func (s *Streamer) GetSegmentURLs(mediaID string, quality Quality, startSegment, count int) ([]string, error)
- func (s *Streamer) GetSession(sessionID string) (*StreamingSession, error)
- func (s *Streamer) SetSessionManager(sessionManager *SessionManager)
- func (s *Streamer) StartSession(userID, mediaID string, format MediaFormat) (*StreamingSession, error)
- func (s *Streamer) StartSessionWithPreferences(userID, mediaID string, format MediaFormat, _ string, ...) (*StreamingSession, error)
- func (s *Streamer) TrackBandwidth(userID string, bytesTransferred int64) error
- func (s *Streamer) UpdateSession(sessionID string, quality Quality, segmentIndex int, bytesTransferred int64) error
- type StreamingConfig
- type StreamingError
- type StreamingSession
- type SubtitleTrack
Constants ¶
const ( QualityAuto = types.QualityAuto QualityLow = types.QualityLow QualityMedium = types.QualityMedium QualityHigh = types.QualityHigh QualitySource = types.QualitySource // Additional quality levels specific to streaming Quality4K Quality = "4k" Quality1080p Quality = "1080p" Quality720p Quality = "720p" Quality480p Quality = "480p" Quality360p Quality = "360p" Quality240p Quality = "240p" )
Re-export constants
const ( FormatHLS = types.FormatHLS FormatDASH = types.FormatDASH FormatSource = types.FormatSource )
Streaming format constants
Variables ¶
var ( ErrMediaMetadataNotFound = errors.NewAppError(errors.CodeNotFound, errors.CategoryMedia, "media metadata not found") ErrGetMetadataFromDynamoDB = errors.FailedToGet("metadata from DynamoDB", stdErrors.New("failed to get metadata from DynamoDB")) ErrCreateMetadataInDynamoDB = errors.FailedToCreate("metadata in DynamoDB", stdErrors.New("failed to create metadata in DynamoDB")) ErrUpdateMetadataInDynamoDB = errors.FailedToUpdate("metadata in DynamoDB", stdErrors.New("failed to update metadata in DynamoDB")) )
Media metadata errors
var ( ErrCheckManifestExists = errors.ProcessingFailed("check manifest exists", stdErrors.New("failed to check manifest existence")) ErrSaveManifestToS3 = errors.ProcessingFailed("save manifest to S3", stdErrors.New("failed to save manifest to S3")) ErrGetSegmentInfo = errors.FailedToGet("segment info", stdErrors.New("failed to get segment info")) ErrListSegments = errors.FailedToList("segments", stdErrors.New("failed to list segments")) ErrCreateIndexForQuality = errors.ProcessingFailed("create index for quality", stdErrors.New("failed to create index for quality")) ErrCreateSegmentsDirectory = errors.ProcessingFailed("create segments directory for quality", stdErrors.New("failed to create segments directory for quality")) ErrCreateMasterIndex = errors.ProcessingFailed("create master index", stdErrors.New("failed to create master index")) ErrGeneratePresignedUploadURL = errors.ProcessingFailed("generate presigned upload URL", stdErrors.New("failed to generate presigned upload URL")) )
S3 operation errors
var ( ErrCloudFrontNotConfigured = errors.ConfigurationMissing("cloudfront_config") ErrInvalidCloudFrontPrivateKeyPath = errors.ConfigurationInvalid("cloudfront_private_key_path", "invalid path") ErrFailedToReadCloudFrontPrivateKeyFile = errors.ProcessingFailed("CloudFront private key file read", stdErrors.New("failed to read CloudFront private key file")) ErrCloudFrontPrivateKeyNotProvided = errors.ConfigurationMissing("cloudfront_private_key") ErrInvalidRSAPrivateKeyPEM = errors.InvalidPrivateKeyType() ErrFailedToParseRSAPrivateKey = errors.ProcessingFailed("RSA private key parsing", stdErrors.New("failed to parse RSA private key")) )
CloudFront configuration errors
var ( ErrFailedToGetKeyframeData = errors.FailedToGet("keyframe data", stdErrors.New("failed to get keyframe data")) ErrFailedToReadIFramePlaylist = errors.ProcessingFailed("I-frame playlist read", stdErrors.New("failed to read I-frame playlist")) ErrFailedToReadKeyframeData = errors.ProcessingFailed("keyframe data read", stdErrors.New("failed to read keyframe data")) )
Keyframe data errors
var ( ErrCreateSession = errors.FailedToCreate("session", stdErrors.New("failed to create session")) ErrGetSession = errors.FailedToGet("session", stdErrors.New("failed to get session")) ErrUpdateSession = errors.FailedToUpdate("session", stdErrors.New("failed to update session")) ErrEndSession = errors.ProcessingFailed("end session", stdErrors.New("failed to end session")) ErrQueryUserSessions = errors.FailedToQuery("user sessions", stdErrors.New("failed to query user sessions")) ErrScanMediaSessions = errors.FailedToQuery("media sessions", stdErrors.New("failed to scan media sessions")) ErrCleanupExpiredSessions = errors.ProcessingFailed("cleanup expired sessions", stdErrors.New("failed to cleanup expired sessions")) )
Session management errors
Functions ¶
func GetSegmentDurationFromPlaylist ¶
GetSegmentDurationFromPlaylist extracts actual segment duration from a playlist
Types ¶
type AdaptationSet ¶
type AdaptationSet struct {
ID string `xml:"id,attr"`
MimeType string `xml:"mimeType,attr"`
SegmentAlignment bool `xml:"segmentAlignment,attr"`
StartWithSAP int `xml:"startWithSAP,attr"`
MaxWidth int `xml:"maxWidth,attr"`
MaxHeight int `xml:"maxHeight,attr"`
MaxFrameRate string `xml:"maxFrameRate,attr"`
Par string `xml:"par,attr"`
Representations []Representation `xml:"Representation"`
}
AdaptationSet represents a set of interchangeable media components
type AdaptiveQualitySelector ¶
type AdaptiveQualitySelector struct {
// contains filtered or unexported fields
}
AdaptiveQualitySelector implements intelligent quality selection
func NewAdaptiveQualitySelector ¶
func NewAdaptiveQualitySelector(logger *zap.Logger) *AdaptiveQualitySelector
NewAdaptiveQualitySelector creates a new quality selector
func (*AdaptiveQualitySelector) CleanupMetrics ¶
func (aqs *AdaptiveQualitySelector) CleanupMetrics(maxAge time.Duration)
CleanupMetrics removes old metrics from cache
func (*AdaptiveQualitySelector) GetQualityMetrics ¶
func (aqs *AdaptiveQualitySelector) GetQualityMetrics(sessionID string) *QualityMetrics
GetQualityMetrics retrieves metrics for a session
func (*AdaptiveQualitySelector) SelectQuality ¶
func (aqs *AdaptiveQualitySelector) SelectQuality(bandwidth int, bufferHealth float64, availableQualities []Quality) Quality
SelectQuality chooses the optimal quality based on bandwidth and buffer health
func (*AdaptiveQualitySelector) SelectQualityWithSession ¶
func (aqs *AdaptiveQualitySelector) SelectQualityWithSession(sessionID string, bandwidth int, bufferHealth float64, availableQualities []Quality) Quality
SelectQualityWithSession chooses the optimal quality using real session metrics
func (*AdaptiveQualitySelector) SetMetricsTracker ¶
func (aqs *AdaptiveQualitySelector) SetMetricsTracker(metricsTracker *MetricsTracker)
SetMetricsTracker sets the metrics tracker for real-time data access
func (*AdaptiveQualitySelector) UpdateMetrics ¶
func (aqs *AdaptiveQualitySelector) UpdateMetrics(sessionID string, rebufferEvents int, qualitySwitches int)
UpdateMetrics updates quality selection metrics
type AudioTrack ¶
AudioTrack represents an audio track
type BandwidthMeasurement ¶
BandwidthMeasurement represents a single bandwidth measurement
type BandwidthStats ¶
type BandwidthStats struct {
UserID string
TotalBytes int64
SessionBytes int64
AverageBandwidth int // in kbps
PeakBandwidth int // in kbps
LastMeasurement time.Time
MeasurementWindow time.Duration
}
BandwidthStats tracks bandwidth usage for a user
type BandwidthTracker ¶
type BandwidthTracker struct {
// contains filtered or unexported fields
}
BandwidthTracker tracks bandwidth usage for users
func NewBandwidthTracker ¶
func NewBandwidthTracker(storage core.RepositoryStorage, logger *zap.Logger, costTracker CostTracker, cloudWatch *cloudwatch.Client) *BandwidthTracker
NewBandwidthTracker creates a new bandwidth tracker
func (*BandwidthTracker) GetBandwidthHistory ¶
func (bt *BandwidthTracker) GetBandwidthHistory(ctx context.Context, userID string, duration time.Duration) ([]BandwidthMeasurement, error)
GetBandwidthHistory retrieves bandwidth measurement history from CloudWatch
func (*BandwidthTracker) GetBandwidthStats ¶
func (bt *BandwidthTracker) GetBandwidthStats(ctx context.Context, userID string) (*BandwidthStats, error)
GetBandwidthStats retrieves bandwidth statistics for a user
func (*BandwidthTracker) GetOptimalQuality ¶
func (bt *BandwidthTracker) GetOptimalQuality(ctx context.Context, userID string, availableBandwidth int) Quality
GetOptimalQuality determines the best quality based on user's bandwidth
func (*BandwidthTracker) RecordBandwidthMeasurement ¶
func (bt *BandwidthTracker) RecordBandwidthMeasurement(ctx context.Context, userID string, bandwidth int) error
RecordBandwidthMeasurement records a bandwidth measurement sample with CloudWatch integration
func (*BandwidthTracker) TrackBandwidth ¶
func (bt *BandwidthTracker) TrackBandwidth(ctx context.Context, userID string, bytesTransferred int64) error
TrackBandwidth records bandwidth usage for a user
type BaseURL ¶
type BaseURL struct {
Value string `xml:",chardata"`
}
BaseURL represents the base URL for media segments
type BufferHealthSample ¶
type BufferHealthSample struct {
Timestamp time.Time
BufferLevel time.Duration
Quality Quality
Bandwidth int // kbps
}
BufferHealthSample captures buffer health at a point in time
type CostTracker ¶
CostTracker interface for tracking AWS costs
type DASHGenerator ¶
type DASHGenerator struct {
// contains filtered or unexported fields
}
DASHGenerator handles DASH manifest generation
func NewDASHGenerator ¶
func NewDASHGenerator(config *StreamingConfig, storage MediaStorage) *DASHGenerator
NewDASHGenerator creates a new DASH generator
func (*DASHGenerator) GenerateAudioAdaptationSet ¶
func (g *DASHGenerator) GenerateAudioAdaptationSet(audioTracks []AudioTrack) AdaptationSet
GenerateAudioAdaptationSet creates an audio adaptation set
func (*DASHGenerator) GenerateLiveMPD ¶
func (g *DASHGenerator) GenerateLiveMPD(mediaID string, windowSize int) (*DASHManifest, error)
GenerateLiveMPD generates a comprehensive live DASH manifest for real-time streaming
func (*DASHGenerator) GenerateLiveMPDContent ¶
func (g *DASHGenerator) GenerateLiveMPDContent(manifest *DASHManifest) (string, error)
GenerateLiveMPDContent generates the live DASH MPD XML content
func (*DASHGenerator) GenerateLiveMPDContentWithSubtitles ¶
func (g *DASHGenerator) GenerateLiveMPDContentWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)
GenerateLiveMPDContentWithSubtitles generates live DASH MPD content with subtitle support
func (*DASHGenerator) GenerateMPD ¶
func (g *DASHGenerator) GenerateMPD(mediaID string, metadata *MediaMetadata) (*DASHManifest, error)
GenerateMPD generates a DASH Media Presentation Description
func (*DASHGenerator) GenerateMPDContent ¶
func (g *DASHGenerator) GenerateMPDContent(manifest *DASHManifest) (string, error)
GenerateMPDContent generates the actual MPD XML content
func (*DASHGenerator) GenerateMPDWithSubtitles ¶
func (g *DASHGenerator) GenerateMPDWithSubtitles(manifest *DASHManifest, subtitles []SubtitleTrack) (string, error)
GenerateMPDWithSubtitles generates an MPD with subtitle tracks
func (*DASHGenerator) ValidateLiveManifest ¶
func (g *DASHGenerator) ValidateLiveManifest(content string) error
ValidateLiveManifest validates a live DASH manifest for compliance
func (*DASHGenerator) ValidateMPD ¶
func (g *DASHGenerator) ValidateMPD(content string) error
ValidateMPD validates a generated MPD
type DASHManifest ¶
type DASHManifest struct {
MediaID string
Duration float64
MinBufferTime float64
Representations []DASHRepresentation
ManifestURL string
GeneratedAt time.Time
CacheDuration time.Duration
// Live streaming specific fields
IsLive bool
AvailabilityStartTime time.Time
PublishTime time.Time
TimeShiftBufferDepth float64 // in seconds
SuggestedPresentationDelay float64 // in seconds
MinimumUpdatePeriod float64 // in seconds
}
DASHManifest represents a DASH MPD (Media Presentation Description)
type DASHRepresentation ¶
type DASHRepresentation struct {
ID string
Quality Quality
Bandwidth int
Width int
Height int
Codecs string
BaseURL string
SegmentTemplate string
}
DASHRepresentation represents a quality representation in DASH
type Event ¶
type Event struct {
SessionID string
UserID string
MediaID string
EventType EventType
Timestamp time.Time
// Event-specific data
Quality Quality
PreviousQuality Quality
Duration time.Duration
BytesLoaded int64
ErrorCode string
BufferHealth float64
Bandwidth int
}
Event represents a streaming event for metrics tracking
type EventType ¶
type EventType string
EventType represents types of streaming events
const ( // EventSessionStart indicates a streaming session has started EventSessionStart EventType = "session_start" // EventQualitySwitch indicates a quality level change EventQualitySwitch EventType = "quality_switch" // EventRebufferStart indicates buffering has started EventRebufferStart EventType = "rebuffer_start" // EventRebufferEnd indicates buffering has ended EventRebufferEnd EventType = "rebuffer_end" // EventSegmentLoad indicates a segment was successfully loaded EventSegmentLoad EventType = "segment_load" // EventSegmentFail indicates a segment failed to load EventSegmentFail EventType = "segment_fail" // EventSessionEnd indicates a streaming session has ended EventSessionEnd EventType = "session_end" )
Streaming event types
type HLSGenerator ¶
type HLSGenerator struct {
// contains filtered or unexported fields
}
HLSGenerator handles HLS manifest generation
func NewHLSGenerator ¶
func NewHLSGenerator(config *StreamingConfig, storage MediaStorage) *HLSGenerator
NewHLSGenerator creates a new HLS generator
func (*HLSGenerator) GenerateIFrameMasterPlaylist ¶
func (g *HLSGenerator) GenerateIFrameMasterPlaylist(manifest *HLSManifest) string
GenerateIFrameMasterPlaylist generates a master playlist with I-frame variants for trick play
func (*HLSGenerator) GenerateIFramePlaylist ¶
func (g *HLSGenerator) GenerateIFramePlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string
GenerateIFramePlaylist generates an I-frame only playlist for trick play with precise keyframe positioning
func (*HLSGenerator) GenerateLivePlaylist ¶
func (g *HLSGenerator) GenerateLivePlaylist(mediaID string, quality Quality, startSegment, windowSize int) string
GenerateLivePlaylist generates a live/sliding window playlist
func (*HLSGenerator) GenerateMasterPlaylist ¶
func (g *HLSGenerator) GenerateMasterPlaylist(mediaID string, metadata *MediaMetadata) (*HLSManifest, error)
GenerateMasterPlaylist generates the HLS master playlist
func (*HLSGenerator) GenerateMasterPlaylistContent ¶
func (g *HLSGenerator) GenerateMasterPlaylistContent(manifest *HLSManifest) string
GenerateMasterPlaylistContent generates the actual M3U8 content for the master playlist
func (*HLSGenerator) GenerateMasterPlaylistWithSubtitles ¶
func (g *HLSGenerator) GenerateMasterPlaylistWithSubtitles(manifest *HLSManifest, subtitles []SubtitleTrack) string
GenerateMasterPlaylistWithSubtitles generates a master playlist including subtitle tracks
func (*HLSGenerator) GenerateVariantPlaylist ¶
func (g *HLSGenerator) GenerateVariantPlaylist(mediaID string, quality Quality, metadata *MediaMetadata) string
GenerateVariantPlaylist generates a variant playlist for a specific quality
func (*HLSGenerator) GenerateWebVTTPlaylist ¶
func (g *HLSGenerator) GenerateWebVTTPlaylist(mediaID string, language string, segments int) string
GenerateWebVTTPlaylist generates a playlist for subtitles/captions
func (*HLSGenerator) ValidatePlaylist ¶
func (g *HLSGenerator) ValidatePlaylist(content string) error
ValidatePlaylist performs basic validation on generated playlists
type HLSManifest ¶
type HLSManifest struct {
MediaID string
Duration float64
Variants []HLSVariant
MasterURL string
GeneratedAt time.Time
CacheDuration time.Duration
}
HLSManifest represents an HLS master playlist
type HLSVariant ¶
type HLSVariant struct {
Quality Quality
Bandwidth int
Resolution string
PlaylistURL string
Codecs string
}
HLSVariant represents a quality variant in HLS
type Keyframe ¶
type Keyframe struct {
PTS float64 // Presentation timestamp in seconds
ByteOffset int64 // Byte offset in the file
ByteLength int64 // Length of the I-frame data in bytes
Duration float64 // Duration until next keyframe
URI string // URI of the segment containing this I-frame
}
Keyframe represents an I-frame position in the media stream
type KeyframeEntry ¶
type KeyframeEntry struct {
PTS float64 `json:"pts"` // Presentation timestamp in seconds
DTS float64 `json:"dts"` // Decode timestamp in seconds
ByteOffset int64 `json:"byte_offset"` // Byte offset in the file
ByteLength int64 `json:"byte_size"` // Size of I-frame data in bytes
FrameNum int `json:"frame_num"` // Frame number
Segment int `json:"segment"` // Which HLS segment contains this keyframe
}
KeyframeEntry represents a single keyframe in JSON format
type KeyframeMetadata ¶
type KeyframeMetadata struct {
MediaID string `json:"media_id"`
Quality string `json:"quality"`
Keyframes []KeyframeEntry `json:"keyframes"`
GOP int `json:"gop_size,omitempty"` // Group of Pictures size
Framerate float64 `json:"framerate,omitempty"` // Video framerate
Duration float64 `json:"duration,omitempty"` // Total duration in seconds
Codec string `json:"codec,omitempty"` // Video codec (H264, H265, etc.)
}
KeyframeMetadata represents JSON keyframe metadata format
type LiveMPD ¶
type LiveMPD struct {
XMLName xml.Name `xml:"MPD"`
XMLNS string `xml:"xmlns,attr"`
Type string `xml:"type,attr"`
AvailabilityStartTime string `xml:"availabilityStartTime,attr"`
PublishTime string `xml:"publishTime,attr"`
MinimumUpdatePeriod string `xml:"minimumUpdatePeriod,attr,omitempty"`
TimeShiftBufferDepth string `xml:"timeShiftBufferDepth,attr,omitempty"`
SuggestedPresentationDelay string `xml:"suggestedPresentationDelay,attr,omitempty"`
MinBufferTime string `xml:"minBufferTime,attr"`
Profiles string `xml:"profiles,attr"`
Periods []Period `xml:"Period"`
}
LiveMPD represents a live Media Presentation Description for DASH streaming
type LiveSegmentTemplate ¶
type LiveSegmentTemplate struct {
Media string `xml:"media,attr"`
Initialization string `xml:"initialization,attr"`
Duration int `xml:"duration,attr,omitempty"`
Timescale int `xml:"timescale,attr"`
AvailabilityTimeOffset float64 `xml:"availabilityTimeOffset,attr,omitempty"`
PresentationTimeOffset int64 `xml:"presentationTimeOffset,attr,omitempty"`
StartNumber int `xml:"startNumber,attr,omitempty"`
}
LiveSegmentTemplate represents a segment template for live DASH streaming
type MPD ¶
type MPD struct {
XMLName xml.Name `xml:"MPD"`
XMLNS string `xml:"xmlns,attr"`
Type string `xml:"type,attr"`
MediaPresentationDuration string `xml:"mediaPresentationDuration,attr"`
MinBufferTime string `xml:"minBufferTime,attr"`
Profiles string `xml:"profiles,attr"`
Periods []Period `xml:"Period"`
}
MPD represents a DASH Media Presentation Description document
type MediaFormat ¶
type MediaFormat = types.MediaFormat
MediaFormat represents streaming media format
type MediaMetadata ¶
type MediaMetadata struct {
MediaID string
OriginalURL string
Duration float64
Width int
Height int
Bitrate int
FileSize int64
ProcessedAt time.Time
AvailableQualities []Quality
Status ProcessingStatus
// Codec information for HLS/DASH manifest generation
VideoCodec string `json:"video_codec,omitempty"` // e.g., "avc1.640028"
AudioCodec string `json:"audio_codec,omitempty"` // e.g., "mp4a.40.2"
VideoProfile string `json:"video_profile,omitempty"` // e.g., "High", "Main", "Baseline"
VideoLevel string `json:"video_level,omitempty"` // e.g., "4.0", "3.1"
QualitySettings map[Quality]QualityCodecInfo `json:"quality_settings,omitempty"` // Per-quality codec info
// Live streaming specific fields
IsLive bool `json:"is_live,omitempty"`
StartTime time.Time `json:"start_time,omitempty"`
// I-frame/keyframe information for trick play
KeyframePositions []float64 `json:"keyframe_positions,omitempty"` // PTS positions of keyframes in seconds
}
MediaMetadata contains metadata about a media file
type MediaSessionRepository ¶
type MediaSessionRepository interface {
CreateSession(ctx context.Context, session *StreamingSession) error
GetSession(ctx context.Context, sessionID string) (*StreamingSession, error)
UpdateSession(ctx context.Context, session *StreamingSession) error
EndSession(ctx context.Context, sessionID string) error
GetUserSessions(ctx context.Context, userID string) ([]*StreamingSession, error)
GetMediaSessions(ctx context.Context, mediaID string, limit int32) ([]*StreamingSession, error)
CleanupExpiredSessions(ctx context.Context, maxAge time.Duration) error
SetSessionTTL(ttl time.Duration)
}
MediaSessionRepository interface for session persistence
type MediaStorage ¶
type MediaStorage interface {
GetManifestPath(mediaID string, format MediaFormat, quality Quality) string
GetSegmentPath(mediaID string, quality Quality, segmentIndex int) string
GetMediaMetadata(mediaID string) (*MediaMetadata, error)
ManifestExists(mediaID string, format MediaFormat) (bool, error)
GetKeyframeData(mediaID string, quality Quality) ([]byte, error)
}
MediaStorage interface for interacting with media files
type MediaStreamer ¶
type MediaStreamer interface {
// Manifest generation
GenerateHLSManifest(mediaID string) (*HLSManifest, error)
GenerateDASHManifest(mediaID string) (*DASHManifest, error)
// Segment management
GetSegmentURL(mediaID string, quality Quality, segment int) (string, error)
GetSegmentURLs(mediaID string, quality Quality, startSegment, count int) ([]string, error)
// Bandwidth tracking
TrackBandwidth(userID string, bytesTransferred int64) error
GetBandwidthStats(userID string) (*BandwidthStats, error)
// Quality selection
GetOptimalQuality(userID string, availableBandwidth int) Quality
GetAvailableQualities(mediaID string) ([]QualityInfo, error)
// Session management
StartSession(userID, mediaID string, format MediaFormat) (*StreamingSession, error)
UpdateSession(sessionID string, quality Quality, segmentIndex int, bytesTransferred int64) error
EndSession(sessionID string) error
GetSession(sessionID string) (*StreamingSession, error)
}
MediaStreamer is the main interface for media streaming functionality
type MetricsTracker ¶
type MetricsTracker struct {
// contains filtered or unexported fields
}
MetricsTracker implements comprehensive streaming metrics tracking with CloudWatch integration
func NewMetricsTracker ¶
func NewMetricsTracker(cloudWatch *cloudwatch.Client, logger *zap.Logger) *MetricsTracker
NewMetricsTracker creates a new metrics tracker with CloudWatch integration
func (*MetricsTracker) Cleanup ¶
func (smt *MetricsTracker) Cleanup(maxAge time.Duration)
Cleanup removes old session metrics to prevent memory leaks
func (*MetricsTracker) EndSession ¶
func (smt *MetricsTracker) EndSession(sessionID string) *SessionMetrics
EndSession finalizes metrics tracking for a session
func (*MetricsTracker) GetSessionMetrics ¶
func (smt *MetricsTracker) GetSessionMetrics(sessionID string) *SessionMetrics
GetSessionMetrics retrieves current metrics for a session
func (*MetricsTracker) StartSession ¶
func (smt *MetricsTracker) StartSession(sessionID, userID, mediaID string)
StartSession initializes metrics tracking for a new session
func (*MetricsTracker) TrackBufferHealth ¶
func (smt *MetricsTracker) TrackBufferHealth(sessionID string, bufferLevel time.Duration, bandwidth int)
TrackBufferHealth records buffer health samples for adaptive bitrate decisions
func (*MetricsTracker) TrackQualitySwitch ¶
func (smt *MetricsTracker) TrackQualitySwitch(sessionID string, fromQuality, toQuality Quality)
TrackQualitySwitch records a quality change event
func (*MetricsTracker) TrackRebufferEvent ¶
func (smt *MetricsTracker) TrackRebufferEvent(sessionID string, duration time.Duration)
TrackRebufferEvent records rebuffering events
func (*MetricsTracker) TrackSegmentFailure ¶
func (smt *MetricsTracker) TrackSegmentFailure(sessionID string, segmentIndex int, errorCode string)
TrackSegmentFailure records segment loading failures
func (*MetricsTracker) TrackSegmentLoad ¶
func (smt *MetricsTracker) TrackSegmentLoad(sessionID string, segmentIndex int, loadTime time.Duration, bytes int64)
TrackSegmentLoad records successful segment loading
type Period ¶
type Period struct {
ID string `xml:"id,attr"`
Start string `xml:"start,attr,omitempty"`
Duration string `xml:"duration,attr,omitempty"`
AdaptationSets []AdaptationSet `xml:"AdaptationSet"`
}
Period represents a period within a DASH manifest
type Preferences ¶
type Preferences struct {
Username string `json:"username"`
DeviceID string `json:"device_id,omitempty"`
DefaultQuality string `json:"default_quality,omitempty"`
AutoQuality bool `json:"auto_quality"`
PreloadNext bool `json:"preload_next"`
DataSaverMode bool `json:"data_saver_mode"`
PreferredCodec string `json:"preferred_codec,omitempty"`
MaxBandwidthMbps int64 `json:"max_bandwidth_mbps,omitempty"`
BufferSizeSeconds int `json:"buffer_size_seconds,omitempty"`
HDREnabled bool `json:"hdr_enabled"`
ColorSpace string `json:"color_space,omitempty"`
SubtitleEnabled bool `json:"subtitle_enabled"`
SubtitleLanguage string `json:"subtitle_language,omitempty"`
AudioDescriptionEnabled bool `json:"audio_description_enabled"`
ClosedCaptionsEnabled bool `json:"closed_captions_enabled"`
Version int `json:"version"`
SchemaVersion int `json:"schema_version"`
}
Preferences represents user streaming preferences for the streamer This is a local type alias to avoid circular imports
type ProcessingStatus ¶
type ProcessingStatus string
ProcessingStatus represents the processing status of media
const ( StatusPending ProcessingStatus = "pending" StatusProcessing ProcessingStatus = "processing" StatusComplete ProcessingStatus = "complete" StatusFailed ProcessingStatus = "failed" )
Processing status constants
type Quality ¶
Quality represents video quality level
func GetQualitiesByBandwidth ¶
GetQualitiesByBandwidth returns qualities that can be supported by the given bandwidth
type QualityCodecInfo ¶
type QualityCodecInfo struct {
VideoCodec string `json:"video_codec"` // H.264 profile/level string like "avc1.640028"
AudioCodec string `json:"audio_codec"` // Audio codec string like "mp4a.40.2"
Bandwidth int `json:"bandwidth"` // Required bandwidth in bps
Width int `json:"width"` // Video width in pixels
Height int `json:"height"` // Video height in pixels
}
QualityCodecInfo contains codec information for a specific quality level
type QualityInfo ¶
type QualityInfo struct {
Quality Quality
Width int
Height int
Bitrate int // in kbps
Bandwidth int // required bandwidth in kbps
Resolution string // e.g., "1920x1080"
}
QualityInfo contains information about a quality level
func GetQualityInfo ¶
func GetQualityInfo(quality Quality) QualityInfo
GetQualityInfo returns quality information for a given quality level
type QualityMetrics ¶
type QualityMetrics struct {
SessionID string
AverageQuality Quality
QualitySwitches int
RebufferEvents int
TimeInEachQuality map[Quality]time.Duration
LastQualityChange time.Time
}
QualityMetrics tracks quality selection metrics
type QualitySelector ¶
type QualitySelector interface {
SelectQuality(bandwidth int, bufferHealth float64, availableQualities []Quality) Quality
UpdateMetrics(sessionID string, rebufferEvents int, qualitySwitches int)
GetQualityMetrics(sessionID string) *QualityMetrics
}
QualitySelector interface for adaptive bitrate selection
type Representation ¶
type Representation struct {
ID string `xml:"id,attr"`
Bandwidth int `xml:"bandwidth,attr"`
Width int `xml:"width,attr"`
Height int `xml:"height,attr"`
Codecs string `xml:"codecs,attr"`
BaseURL BaseURL `xml:"BaseURL"`
SegmentTemplate interface{} `xml:"SegmentTemplate"`
}
Representation represents a single encoded version of the content
type S3MediaStorage ¶
type S3MediaStorage struct {
// contains filtered or unexported fields
}
S3MediaStorage implements MediaStorage using S3 for files and DynamORM for metadata
func NewS3MediaStorage ¶
NewS3MediaStorage creates a new S3-based media storage with DynamORM for metadata
func (*S3MediaStorage) CreateMediaStructure ¶
func (s *S3MediaStorage) CreateMediaStructure(mediaID string, qualities []Quality) error
CreateMediaStructure creates the S3 structure for a new media item with real artifacts
func (*S3MediaStorage) GetCloudFrontDomain ¶
func (s *S3MediaStorage) GetCloudFrontDomain() string
GetCloudFrontDomain returns the configured CloudFront domain
func (*S3MediaStorage) GetKeyframeData ¶
func (s *S3MediaStorage) GetKeyframeData(mediaID string, quality Quality) ([]byte, error)
GetKeyframeData retrieves keyframe/I-frame data for a media item at a specific quality level
func (*S3MediaStorage) GetManifestPath ¶
func (s *S3MediaStorage) GetManifestPath(mediaID string, format MediaFormat, quality Quality) string
GetManifestPath returns the S3 path for a manifest file
func (*S3MediaStorage) GetMediaMetadata ¶
func (s *S3MediaStorage) GetMediaMetadata(mediaID string) (*MediaMetadata, error)
GetMediaMetadata retrieves metadata for a media item
func (*S3MediaStorage) GetPresignedUploadURL ¶
func (s *S3MediaStorage) GetPresignedUploadURL(mediaID string, filename string) (string, error)
GetPresignedUploadURL generates a presigned URL for uploading media
func (*S3MediaStorage) GetSegmentInfo ¶
func (s *S3MediaStorage) GetSegmentInfo(mediaID string, quality Quality, segmentIndex int) (*Segment, error)
GetSegmentInfo retrieves information about a segment
func (*S3MediaStorage) GetSegmentPath ¶
func (s *S3MediaStorage) GetSegmentPath(mediaID string, quality Quality, segmentIndex int) string
GetSegmentPath returns the S3 path for a segment file
func (*S3MediaStorage) IsCloudFrontEnabled ¶
func (s *S3MediaStorage) IsCloudFrontEnabled() bool
IsCloudFrontEnabled returns true if CloudFront is properly configured
func (*S3MediaStorage) ListSegments ¶
func (s *S3MediaStorage) ListSegments(mediaID string, quality Quality) ([]*Segment, error)
ListSegments lists all segments for a quality level
func (*S3MediaStorage) ManifestExists ¶
func (s *S3MediaStorage) ManifestExists(mediaID string, format MediaFormat) (bool, error)
ManifestExists checks if a manifest file exists
func (*S3MediaStorage) SaveManifest ¶
func (s *S3MediaStorage) SaveManifest(mediaID string, format MediaFormat, quality Quality, content string) error
SaveManifest saves a manifest to S3
func (*S3MediaStorage) UpdateMediaMetadata ¶
func (s *S3MediaStorage) UpdateMediaMetadata(mediaID string, metadata *MediaMetadata) error
UpdateMediaMetadata updates the metadata for a media item
type SegmentTemplate ¶
type SegmentTemplate struct {
Media string `xml:"media,attr"`
Initialization string `xml:"initialization,attr"`
StartNumber int `xml:"startNumber,attr"`
Duration int `xml:"duration,attr"`
Timescale int `xml:"timescale,attr"`
}
SegmentTemplate defines how media segments are addressed
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager manages streaming sessions Uses DynamoDB TTL for automatic session expiration in serverless environments
func NewSessionManager ¶
func NewSessionManager(repo MediaSessionRepository, logger *zap.Logger, costTracker CostTracker) *SessionManager
NewSessionManager creates a new session manager
func (*SessionManager) CleanupExpiredSessions ¶
CleanupExpiredSessions removes sessions older than the specified duration NOTE: This method is optional in serverless environments. DynamoDB TTL will automatically remove expired sessions. This method can be used for immediate cleanup if needed, but incurs additional DynamoDB scan costs.
func (*SessionManager) CreateSession ¶
func (sm *SessionManager) CreateSession(ctx context.Context, session *StreamingSession) error
CreateSession creates a new streaming session
func (*SessionManager) EndSession ¶
func (sm *SessionManager) EndSession(ctx context.Context, sessionID string) error
EndSession marks a session as ended
func (*SessionManager) GetMediaSessions ¶
func (sm *SessionManager) GetMediaSessions(ctx context.Context, mediaID string, limit int32) ([]*StreamingSession, error)
GetMediaSessions retrieves sessions for a specific media item
func (*SessionManager) GetSession ¶
func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*StreamingSession, error)
GetSession retrieves a streaming session
func (*SessionManager) GetUserSessions ¶
func (sm *SessionManager) GetUserSessions(ctx context.Context, userID string) ([]*StreamingSession, error)
GetUserSessions retrieves active sessions for a user
func (*SessionManager) SetSessionTTL ¶
func (sm *SessionManager) SetSessionTTL(ttl time.Duration)
SetSessionTTL configures the TTL for streaming sessions Common values: - 6 hours for media streaming sessions - 24 hours for analytics data - 7 days for historical metrics
Example usage:
sm.SetSessionTTL(6 * time.Hour) // Short-lived media sessions sm.SetSessionTTL(24 * time.Hour) // Analytics data (default) sm.SetSessionTTL(7 * 24 * time.Hour) // Historical metrics
func (*SessionManager) UpdateSession ¶
func (sm *SessionManager) UpdateSession(ctx context.Context, session *StreamingSession) error
UpdateSession updates a streaming session
type SessionMetrics ¶
type SessionMetrics struct {
SessionID string
UserID string
MediaID string
StartTime time.Time
LastUpdate time.Time
// Quality tracking
CurrentQuality Quality
QualitySwitches int
TimeInEachQuality map[Quality]time.Duration
LastQualityChange time.Time
// Rebuffer tracking
RebufferEvents int
TotalRebufferTime time.Duration
LastRebufferTime time.Time
// Performance metrics
StartupTime time.Duration
SegmentSuccessRate float64
TotalSegments int
FailedSegments int
BytesTransferred int64
// Buffer health history
BufferHealthHistory []BufferHealthSample
// QoE calculation
QoEScore float64
LastQoEUpdate time.Time
}
SessionMetrics tracks detailed metrics for a streaming session
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer implements the MediaStreamer interface
func NewStreamer ¶
func NewStreamer( config *StreamingConfig, analytics core.RepositoryStorage, s3Client *s3.Client, cloudWatch *cloudwatch.Client, db dynamormCore.DB, logger *zap.Logger, costTracker CostTracker, ) *Streamer
NewStreamer creates a new media streamer
func (*Streamer) EndSession ¶
EndSession ends a streaming session
func (*Streamer) GenerateDASHManifest ¶
func (s *Streamer) GenerateDASHManifest(mediaID string) (*DASHManifest, error)
GenerateDASHManifest generates a DASH manifest for a media item
func (*Streamer) GenerateHLSManifest ¶
func (s *Streamer) GenerateHLSManifest(mediaID string) (*HLSManifest, error)
GenerateHLSManifest generates an HLS manifest for a media item
func (*Streamer) GetAvailableQualities ¶
func (s *Streamer) GetAvailableQualities(mediaID string) ([]QualityInfo, error)
GetAvailableQualities returns available qualities for a media item
func (*Streamer) GetAvailableQualitiesForUser ¶
func (s *Streamer) GetAvailableQualitiesForUser(mediaID, userID string) ([]QualityInfo, error)
GetAvailableQualitiesForUser returns available qualities for a media item filtered by user preferences
func (*Streamer) GetBandwidthStats ¶
func (s *Streamer) GetBandwidthStats(userID string) (*BandwidthStats, error)
GetBandwidthStats retrieves bandwidth statistics
func (*Streamer) GetOptimalQuality ¶
GetOptimalQuality determines the best quality for a user based on preferences and bandwidth
func (*Streamer) GetOptimalQualityWithPreferences ¶
func (s *Streamer) GetOptimalQualityWithPreferences(userID string, availableBandwidth int, _ string, userPreferences *Preferences) Quality
GetOptimalQualityWithPreferences determines the best quality considering user preferences
func (*Streamer) GetSegmentURL ¶
GetSegmentURL returns the URL for a specific segment
func (*Streamer) GetSegmentURLs ¶
func (s *Streamer) GetSegmentURLs(mediaID string, quality Quality, startSegment, count int) ([]string, error)
GetSegmentURLs returns URLs for multiple segments
func (*Streamer) GetSession ¶
func (s *Streamer) GetSession(sessionID string) (*StreamingSession, error)
GetSession retrieves a streaming session
func (*Streamer) SetSessionManager ¶
func (s *Streamer) SetSessionManager(sessionManager *SessionManager)
SetSessionManager sets the session manager (for dependency injection)
func (*Streamer) StartSession ¶
func (s *Streamer) StartSession(userID, mediaID string, format MediaFormat) (*StreamingSession, error)
StartSession starts a new streaming session Sessions are automatically cleaned up after 24 hours using DynamoDB TTL
func (*Streamer) StartSessionWithPreferences ¶
func (s *Streamer) StartSessionWithPreferences(userID, mediaID string, format MediaFormat, _ string, userPreferences *Preferences) (*StreamingSession, error)
StartSessionWithPreferences starts a new streaming session with user preferences
func (*Streamer) TrackBandwidth ¶
TrackBandwidth records bandwidth usage
type StreamingConfig ¶
type StreamingConfig struct {
CDNBaseURL string
S3Bucket string
S3Region string
SegmentDuration int // in seconds
ManifestCacheTTL time.Duration
MaxConcurrentJobs int
EnableCostTracking bool
DefaultQuality Quality
// Bandwidth thresholds for quality selection (in kbps)
Bandwidth4K int
Bandwidth1080p int
Bandwidth720p int
Bandwidth480p int
Bandwidth360p int
Bandwidth240p int
}
StreamingConfig holds configuration for the streaming service
type StreamingError ¶
StreamingError represents an error in streaming operations
func (*StreamingError) Error ¶
func (e *StreamingError) Error() string
type StreamingSession ¶
type StreamingSession = types.StreamingSession
StreamingSession represents an active streaming session