byoc

package
v0.8.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 42 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LiveAIAuthWebhookURL *url.URL

Functions

This section is empty.

Types

type AIAuthRequest

type AIAuthRequest struct {
	// Stream name or stream key
	Stream    string `json:"stream"`
	StreamKey string `json:"stream_key"`

	// Stream type, eg RTMP or WHIP
	Type string `json:"type"`

	// Query parameters that came with the stream, if any
	QueryParams string `json:"query_params,omitempty"`

	// Gateway host
	GatewayHost string `json:"gateway_host"`
	WhepURL     string `json:"whep_url"`
	StatusURL   string `json:"status_url"`
	UpdateURL   string `json:"update_url"`
}

type AIAuthResponse

type AIAuthResponse struct {
	// Where to send the output video
	RTMPOutputURL string `json:"rtmp_output_url"`

	// Name of the pipeline to run
	Pipeline string `json:"pipeline"`

	// ID of the pipeline to run
	PipelineID string `json:"pipeline_id"`

	// ID of the stream
	StreamID string `json:"stream_id"`

	// Parameters for the pipeline
	PipelineParams json.RawMessage `json:"pipeline_parameters"`
	// contains filtered or unexported fields
}

Contains the configuration parameters for this AI job

type BYOCGatewayServer

type BYOCGatewayServer struct {
	StreamPipelines map[string]*BYOCStreamPipeline
	// contains filtered or unexported fields
}

func NewBYOCGatewayServer

func NewBYOCGatewayServer(node *core.LivepeerNode, statusStore StatusStore, whipServer *media.WHIPServer, whepServer *media.WHEPServer, mux *http.ServeMux) *BYOCGatewayServer

NewBYOCServer creates a new BYOC server instance

func (*BYOCGatewayServer) LiveErrorEventSender

func (bs *BYOCGatewayServer) LiveErrorEventSender(ctx context.Context, streamID string, event map[string]string) func(err error)

func (*BYOCGatewayServer) Node

func (bsg *BYOCGatewayServer) Node() *core.LivepeerNode

func (*BYOCGatewayServer) SharedBalanceLock

func (bsg *BYOCGatewayServer) SharedBalanceLock() *sync.Mutex

func (*BYOCGatewayServer) StartStream

func (bsg *BYOCGatewayServer) StartStream() http.Handler

func (*BYOCGatewayServer) StartStreamRTMPIngest

func (bsg *BYOCGatewayServer) StartStreamRTMPIngest() http.Handler

mediamtx sends this request to go-livepeer when rtmp stream received

func (*BYOCGatewayServer) StartStreamWhipIngest

func (bsg *BYOCGatewayServer) StartStreamWhipIngest(whipServer *media.WHIPServer) http.Handler

func (*BYOCGatewayServer) StopStream

func (bsg *BYOCGatewayServer) StopStream() http.Handler

func (*BYOCGatewayServer) StreamData

func (bsg *BYOCGatewayServer) StreamData() http.Handler

func (*BYOCGatewayServer) StreamStatus

func (bsg *BYOCGatewayServer) StreamStatus() http.Handler

func (*BYOCGatewayServer) SubmitJob

func (bsg *BYOCGatewayServer) SubmitJob() http.Handler

Gateway handler for job request

func (*BYOCGatewayServer) UpdateStream

func (bsg *BYOCGatewayServer) UpdateStream() http.Handler

type BYOCOrchestratorServer

type BYOCOrchestratorServer struct {
	// contains filtered or unexported fields
}

func NewBYOCOrchestratorServer

func NewBYOCOrchestratorServer(node *core.LivepeerNode, orch Orchestrator, trickleSrv *trickle.Server, trickleBasePath string, mux *http.ServeMux) *BYOCOrchestratorServer

func (*BYOCOrchestratorServer) GetJobToken

func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler

func (*BYOCOrchestratorServer) Node

func (*BYOCOrchestratorServer) ProcessJob

func (bso *BYOCOrchestratorServer) ProcessJob() http.Handler

func (*BYOCOrchestratorServer) ProcessStreamPayment

func (bso *BYOCOrchestratorServer) ProcessStreamPayment() http.Handler

func (*BYOCOrchestratorServer) RegisterCapability

func (bs *BYOCOrchestratorServer) RegisterCapability() http.Handler

worker registers to Orchestrator

func (*BYOCOrchestratorServer) SharedBalanceLock

func (bso *BYOCOrchestratorServer) SharedBalanceLock() *sync.Mutex

func (*BYOCOrchestratorServer) StartStream

func (bso *BYOCOrchestratorServer) StartStream() http.Handler

func (*BYOCOrchestratorServer) StopStream

func (bso *BYOCOrchestratorServer) StopStream() http.Handler

func (*BYOCOrchestratorServer) UnregisterCapability

func (bs *BYOCOrchestratorServer) UnregisterCapability() http.Handler

func (*BYOCOrchestratorServer) UpdateStream

func (bso *BYOCOrchestratorServer) UpdateStream() http.Handler

type BYOCStreamPipeline

type BYOCStreamPipeline struct {
	RequestID    string
	StreamID     string
	Params       []byte
	Pipeline     string
	ControlPub   *trickle.TricklePublisher
	StopControl  func()
	ReportUpdate func([]byte)
	OutCond      *sync.Cond
	OutWriter    *media.RingBuffer
	Closed       bool

	DataWriter *media.SegmentWriter
	// contains filtered or unexported fields
}

type JobOrchestratorsFilter

type JobOrchestratorsFilter struct {
	Exclude []string `json:"exclude,omitempty"`
	Include []string `json:"include,omitempty"`
}

type JobParameters

type JobParameters struct {
	// Gateway
	Orchestrators JobOrchestratorsFilter `json:"orchestrators,omitempty"` // list of orchestrators to use for the job

	// Orchestrator
	EnableVideoIngress bool `json:"enable_video_ingress,omitempty"`
	EnableVideoEgress  bool `json:"enable_video_egress,omitempty"`
	EnableDataOutput   bool `json:"enable_data_output,omitempty"`
}

type JobRequest

type JobRequest struct {
	ID            string `json:"id"`
	Request       string `json:"request"`
	Parameters    string `json:"parameters"` // additional information for the Gateway to use to select orchestrators or to send to the worker
	Capability    string `json:"capability"`
	CapabilityUrl string `json:"capability_url"` // this is set when verified orch as capability
	Sender        string `json:"sender"`
	Sig           string `json:"sig"`
	Timeout       int    `json:"timeout_seconds"`

	OrchSearchTimeout     time.Duration
	OrchSearchRespTimeout time.Duration
}

Core job request structures

type JobRequestDetails

type JobRequestDetails struct {
	StreamId string `json:"stream_id"`
}

type JobSender

type JobSender struct {
	Addr string `json:"addr"`
	Sig  string `json:"sig"`
}

type JobToken

type JobToken struct {
	SenderAddress     *JobSender        `json:"sender_address,omitempty"`
	TicketParams      *net.TicketParams `json:"ticket_params,omitempty"`
	Balance           int64             `json:"balance,omitempty"`
	Price             *net.PriceInfo    `json:"price,omitempty"`
	ServiceAddr       string            `json:"service_addr,omitempty"`
	AvailableCapacity int64             `json:"available_capacity,omitempty"`

	LastNonce uint32
}

func (JobToken) Address

func (jt JobToken) Address() string

func (JobToken) URL

func (jt JobToken) URL() string

type Orchestrator

type Orchestrator interface {
	TranscoderSecret() string
	VerifySig(addr ethcommon.Address, msg string, sig []byte) bool
	ReserveExternalCapabilityCapacity(capability string) error
	GetUrlForCapability(capability string) string
	JobPriceInfo(sender ethcommon.Address, capability string) (*net.PriceInfo, error)
	TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error)
	Balance(sender ethcommon.Address, manifestID core.ManifestID) *big.Rat
	CheckExternalCapabilityCapacity(capability string) int64
	RemoveExternalCapability(extCapName string) error
	RegisterExternalCapability(extCapSettings string) (*core.ExternalCapability, error)
	FreeExternalCapabilityCapacity(capability string) error
	ServiceURI() *url.URL
	ProcessPayment(ctx context.Context, payment net.Payment, manifestID core.ManifestID) error
	DebitFees(sender ethcommon.Address, manifestID core.ManifestID, priceInfo *net.PriceInfo, units int64)
}

Orchestrator is a subset of the Orchestrator interface for orchestrator operations needed by BYOC

type OrchestratorSwapper

type OrchestratorSwapper interface {
	// contains filtered or unexported methods
}

type SlowOrchChecker

type SlowOrchChecker struct {
	// contains filtered or unexported fields
}

Detect 'slow' orchs by keeping track of in-flight segments Count the difference between segments produced and segments completed

func (*SlowOrchChecker) BeginSegment

func (s *SlowOrchChecker) BeginSegment() (int, bool)

Returns the number of segments begun so far and whether the max number of inflight segments was hit. Number of segments is not incremented if inflight max is hit. If inflight max is hit, returns true, false otherwise.

func (*SlowOrchChecker) EndSegment

func (s *SlowOrchChecker) EndSegment()

func (*SlowOrchChecker) GetCount

func (s *SlowOrchChecker) GetCount() int

type StartRequest

type StartRequest struct {
	Stream     string `json:"stream_name"`
	RtmpOutput string `json:"rtmp_output"`
	StreamId   string `json:"stream_id"`
	Params     string `json:"params"`
}

type StatusStore

type StatusStore interface {
	Store(streamID string, status map[string]interface{})
	StoreKey(streamID, key string, status interface{})
	Clear(streamID string)
	Get(streamID string) (map[string]interface{}, bool)
	StoreIfNotExists(streamID string, key string, status interface{})
}

interface to interact with streamStatusStore passed to BYOCGatewayServer at initialization

type StreamUrls

type StreamUrls struct {
	StreamId      string `json:"stream_id"`
	WhipUrl       string `json:"whip_url"`
	WhepUrl       string `json:"whep_url"`
	RtmpUrl       string `json:"rtmp_url"`
	RtmpOutputUrl string `json:"rtmp_output_url"`
	UpdateUrl     string `json:"update_url"`
	StatusUrl     string `json:"status_url"`
	DataUrl       string `json:"data_url"`
	StopUrl       string `json:"stop_url"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL