Documentation
¶
Index ¶
- Variables
- type AIAuthRequest
- type AIAuthResponse
- type BYOCGatewayServer
- func (bs *BYOCGatewayServer) LiveErrorEventSender(ctx context.Context, streamID string, event map[string]string) func(err error)
- func (bsg *BYOCGatewayServer) Node() *core.LivepeerNode
- func (bsg *BYOCGatewayServer) SharedBalanceLock() *sync.Mutex
- func (bsg *BYOCGatewayServer) StartStream() http.Handler
- func (bsg *BYOCGatewayServer) StartStreamRTMPIngest() http.Handler
- func (bsg *BYOCGatewayServer) StartStreamWhipIngest(whipServer *media.WHIPServer) http.Handler
- func (bsg *BYOCGatewayServer) StopStream() http.Handler
- func (bsg *BYOCGatewayServer) StreamData() http.Handler
- func (bsg *BYOCGatewayServer) StreamStatus() http.Handler
- func (bsg *BYOCGatewayServer) SubmitJob() http.Handler
- func (bsg *BYOCGatewayServer) UpdateStream() http.Handler
- type BYOCOrchestratorServer
- func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler
- func (bso *BYOCOrchestratorServer) Node() *core.LivepeerNode
- func (bso *BYOCOrchestratorServer) ProcessJob() http.Handler
- func (bso *BYOCOrchestratorServer) ProcessStreamPayment() http.Handler
- func (bs *BYOCOrchestratorServer) RegisterCapability() http.Handler
- func (bso *BYOCOrchestratorServer) SharedBalanceLock() *sync.Mutex
- func (bso *BYOCOrchestratorServer) StartStream() http.Handler
- func (bso *BYOCOrchestratorServer) StopStream() http.Handler
- func (bs *BYOCOrchestratorServer) UnregisterCapability() http.Handler
- func (bso *BYOCOrchestratorServer) UpdateStream() http.Handler
- type BYOCStreamPipeline
- type JobOrchestratorsFilter
- type JobParameters
- type JobRequest
- type JobRequestDetails
- type JobSender
- type JobToken
- type Orchestrator
- type OrchestratorSwapper
- type SlowOrchChecker
- type StartRequest
- type StatusStore
- type StreamUrls
Constants ¶
This section is empty.
Variables ¶
var LiveAIAuthWebhookURL *url.URL
Functions ¶
This section is empty.
Types ¶
type AIAuthRequest ¶
type AIAuthRequest struct {
// Stream name or stream key
Stream string `json:"stream"`
StreamKey string `json:"stream_key"`
// Stream type, eg RTMP or WHIP
Type string `json:"type"`
// Query parameters that came with the stream, if any
QueryParams string `json:"query_params,omitempty"`
// Gateway host
GatewayHost string `json:"gateway_host"`
WhepURL string `json:"whep_url"`
StatusURL string `json:"status_url"`
UpdateURL string `json:"update_url"`
}
type AIAuthResponse ¶
type AIAuthResponse struct {
// Where to send the output video
RTMPOutputURL string `json:"rtmp_output_url"`
// Name of the pipeline to run
Pipeline string `json:"pipeline"`
// ID of the pipeline to run
PipelineID string `json:"pipeline_id"`
// ID of the stream
StreamID string `json:"stream_id"`
// Parameters for the pipeline
PipelineParams json.RawMessage `json:"pipeline_parameters"`
// contains filtered or unexported fields
}
Contains the configuration parameters for this AI job
type BYOCGatewayServer ¶
type BYOCGatewayServer struct {
StreamPipelines map[string]*BYOCStreamPipeline
// contains filtered or unexported fields
}
func NewBYOCGatewayServer ¶
func NewBYOCGatewayServer(node *core.LivepeerNode, statusStore StatusStore, whipServer *media.WHIPServer, whepServer *media.WHEPServer, mux *http.ServeMux) *BYOCGatewayServer
NewBYOCServer creates a new BYOC server instance
func (*BYOCGatewayServer) LiveErrorEventSender ¶
func (*BYOCGatewayServer) Node ¶
func (bsg *BYOCGatewayServer) Node() *core.LivepeerNode
func (*BYOCGatewayServer) SharedBalanceLock ¶
func (bsg *BYOCGatewayServer) SharedBalanceLock() *sync.Mutex
func (*BYOCGatewayServer) StartStream ¶
func (bsg *BYOCGatewayServer) StartStream() http.Handler
func (*BYOCGatewayServer) StartStreamRTMPIngest ¶
func (bsg *BYOCGatewayServer) StartStreamRTMPIngest() http.Handler
mediamtx sends this request to go-livepeer when rtmp stream received
func (*BYOCGatewayServer) StartStreamWhipIngest ¶
func (bsg *BYOCGatewayServer) StartStreamWhipIngest(whipServer *media.WHIPServer) http.Handler
func (*BYOCGatewayServer) StopStream ¶
func (bsg *BYOCGatewayServer) StopStream() http.Handler
func (*BYOCGatewayServer) StreamData ¶
func (bsg *BYOCGatewayServer) StreamData() http.Handler
func (*BYOCGatewayServer) StreamStatus ¶
func (bsg *BYOCGatewayServer) StreamStatus() http.Handler
func (*BYOCGatewayServer) SubmitJob ¶
func (bsg *BYOCGatewayServer) SubmitJob() http.Handler
Gateway handler for job request
func (*BYOCGatewayServer) UpdateStream ¶
func (bsg *BYOCGatewayServer) UpdateStream() http.Handler
type BYOCOrchestratorServer ¶
type BYOCOrchestratorServer struct {
// contains filtered or unexported fields
}
func NewBYOCOrchestratorServer ¶
func NewBYOCOrchestratorServer(node *core.LivepeerNode, orch Orchestrator, trickleSrv *trickle.Server, trickleBasePath string, mux *http.ServeMux) *BYOCOrchestratorServer
func (*BYOCOrchestratorServer) GetJobToken ¶
func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler
func (*BYOCOrchestratorServer) Node ¶
func (bso *BYOCOrchestratorServer) Node() *core.LivepeerNode
func (*BYOCOrchestratorServer) ProcessJob ¶
func (bso *BYOCOrchestratorServer) ProcessJob() http.Handler
func (*BYOCOrchestratorServer) ProcessStreamPayment ¶
func (bso *BYOCOrchestratorServer) ProcessStreamPayment() http.Handler
func (*BYOCOrchestratorServer) RegisterCapability ¶
func (bs *BYOCOrchestratorServer) RegisterCapability() http.Handler
worker registers to Orchestrator
func (*BYOCOrchestratorServer) SharedBalanceLock ¶
func (bso *BYOCOrchestratorServer) SharedBalanceLock() *sync.Mutex
func (*BYOCOrchestratorServer) StartStream ¶
func (bso *BYOCOrchestratorServer) StartStream() http.Handler
func (*BYOCOrchestratorServer) StopStream ¶
func (bso *BYOCOrchestratorServer) StopStream() http.Handler
func (*BYOCOrchestratorServer) UnregisterCapability ¶
func (bs *BYOCOrchestratorServer) UnregisterCapability() http.Handler
func (*BYOCOrchestratorServer) UpdateStream ¶
func (bso *BYOCOrchestratorServer) UpdateStream() http.Handler
type BYOCStreamPipeline ¶
type BYOCStreamPipeline struct {
RequestID string
StreamID string
Params []byte
Pipeline string
ControlPub *trickle.TricklePublisher
StopControl func()
ReportUpdate func([]byte)
OutCond *sync.Cond
OutWriter *media.RingBuffer
Closed bool
DataWriter *media.SegmentWriter
// contains filtered or unexported fields
}
type JobOrchestratorsFilter ¶
type JobParameters ¶
type JobParameters struct {
// Gateway
Orchestrators JobOrchestratorsFilter `json:"orchestrators,omitempty"` // list of orchestrators to use for the job
// Orchestrator
EnableVideoIngress bool `json:"enable_video_ingress,omitempty"`
EnableVideoEgress bool `json:"enable_video_egress,omitempty"`
EnableDataOutput bool `json:"enable_data_output,omitempty"`
}
type JobRequest ¶
type JobRequest struct {
ID string `json:"id"`
Request string `json:"request"`
Parameters string `json:"parameters"` // additional information for the Gateway to use to select orchestrators or to send to the worker
Capability string `json:"capability"`
CapabilityUrl string `json:"capability_url"` // this is set when verified orch as capability
Sender string `json:"sender"`
Sig string `json:"sig"`
Timeout int `json:"timeout_seconds"`
OrchSearchTimeout time.Duration
OrchSearchRespTimeout time.Duration
}
Core job request structures
type JobRequestDetails ¶
type JobRequestDetails struct {
StreamId string `json:"stream_id"`
}
type JobToken ¶
type JobToken struct {
SenderAddress *JobSender `json:"sender_address,omitempty"`
TicketParams *net.TicketParams `json:"ticket_params,omitempty"`
Balance int64 `json:"balance,omitempty"`
Price *net.PriceInfo `json:"price,omitempty"`
ServiceAddr string `json:"service_addr,omitempty"`
AvailableCapacity int64 `json:"available_capacity,omitempty"`
LastNonce uint32
}
type Orchestrator ¶
type Orchestrator interface {
TranscoderSecret() string
VerifySig(addr ethcommon.Address, msg string, sig []byte) bool
ReserveExternalCapabilityCapacity(capability string) error
GetUrlForCapability(capability string) string
JobPriceInfo(sender ethcommon.Address, capability string) (*net.PriceInfo, error)
TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error)
Balance(sender ethcommon.Address, manifestID core.ManifestID) *big.Rat
CheckExternalCapabilityCapacity(capability string) int64
RemoveExternalCapability(extCapName string) error
RegisterExternalCapability(extCapSettings string) (*core.ExternalCapability, error)
FreeExternalCapabilityCapacity(capability string) error
ServiceURI() *url.URL
ProcessPayment(ctx context.Context, payment net.Payment, manifestID core.ManifestID) error
DebitFees(sender ethcommon.Address, manifestID core.ManifestID, priceInfo *net.PriceInfo, units int64)
}
Orchestrator is a subset of the Orchestrator interface for orchestrator operations needed by BYOC
type OrchestratorSwapper ¶
type OrchestratorSwapper interface {
// contains filtered or unexported methods
}
type SlowOrchChecker ¶
type SlowOrchChecker struct {
// contains filtered or unexported fields
}
Detect 'slow' orchs by keeping track of in-flight segments Count the difference between segments produced and segments completed
func (*SlowOrchChecker) BeginSegment ¶
func (s *SlowOrchChecker) BeginSegment() (int, bool)
Returns the number of segments begun so far and whether the max number of inflight segments was hit. Number of segments is not incremented if inflight max is hit. If inflight max is hit, returns true, false otherwise.
func (*SlowOrchChecker) EndSegment ¶
func (s *SlowOrchChecker) EndSegment()
func (*SlowOrchChecker) GetCount ¶
func (s *SlowOrchChecker) GetCount() int
type StartRequest ¶
type StatusStore ¶
type StatusStore interface {
Store(streamID string, status map[string]interface{})
StoreKey(streamID, key string, status interface{})
Clear(streamID string)
Get(streamID string) (map[string]interface{}, bool)
StoreIfNotExists(streamID string, key string, status interface{})
}
interface to interact with streamStatusStore passed to BYOCGatewayServer at initialization
type StreamUrls ¶
type StreamUrls struct {
StreamId string `json:"stream_id"`
WhipUrl string `json:"whip_url"`
WhepUrl string `json:"whep_url"`
RtmpUrl string `json:"rtmp_url"`
RtmpOutputUrl string `json:"rtmp_output_url"`
UpdateUrl string `json:"update_url"`
StatusUrl string `json:"status_url"`
DataUrl string `json:"data_url"`
StopUrl string `json:"stop_url"`
}