worker

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const PreConnectAudioBufferStream = "lk.agent.pre-connect-audio-buffer"

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentServer

type AgentServer struct {
	Options WorkerOptions
	// contains filtered or unexported fields
}

func NewAgentServer

func NewAgentServer(opts WorkerOptions) *AgentServer

func (*AgentServer) CurrentCPULoad added in v0.0.6

func (s *AgentServer) CurrentCPULoad() float64

CurrentCPULoad returns the most recent CPU usage as a fraction in [0.0, 1.0]. Returns math.NaN() until the first sample is available (~5 s after startup).

func (*AgentServer) Drain added in v0.0.5

func (s *AgentServer) Drain(ctx context.Context) error

Drain stops the worker from accepting new jobs and waits for existing ones to finish.

func (*AgentServer) ExecuteLocalJob

func (s *AgentServer) ExecuteLocalJob(ctx context.Context, roomName string, participantIdentity string) error

ExecuteLocalJob runs a job locally without connecting to the worker service, useful for the CLI console

func (*AgentServer) GetConsoleSession

func (s *AgentServer) GetConsoleSession() any

GetConsoleSession retrieves the active local console session

func (*AgentServer) GetEntrypointFunc added in v0.0.5

func (s *AgentServer) GetEntrypointFunc() func(*JobContext) error

GetEntrypointFunc retrieves the registered entrypoint function (for console mode)

func (*AgentServer) NumActiveJobs added in v0.0.6

func (s *AgentServer) NumActiveJobs() int

func (*AgentServer) RTCSession

func (s *AgentServer) RTCSession(
	entrypoint func(*JobContext) error,
	request func(*JobRequest) error,
	sessionEnd func(*JobContext) error,
)

func (*AgentServer) Run

func (s *AgentServer) Run(ctx context.Context) error

func (*AgentServer) SetConsoleSession

func (s *AgentServer) SetConsoleSession(session any)

SetConsoleSession allows entrypoints to register their session for console interaction

type AudioDecoder

type AudioDecoder interface {
	Decode(data []byte) ([]byte, error)
	Close() error
}

type AudioEncoder

type AudioEncoder interface {
	Encode(pcm []byte) ([]byte, error)
	SampleRate() int
	Channels() int
	Close() error
}

type AudioInputOptions added in v0.0.5

type AudioInputOptions struct {
	Enabled                bool
	SampleRate             int
	NumChannels            int
	FrameSizeMs            int
	PreConnectAudio        bool
	PreConnectAudioTimeout time.Duration
}

type AudioOutputOptions added in v0.0.5

type AudioOutputOptions struct {
	Enabled             bool
	SampleRate          int
	NumChannels         int
	TrackName           string
	TrackPublishOptions *lksdk.TrackPublicationOptions
}

type JobAcceptArguments

type JobAcceptArguments struct {
	Name       string
	Identity   string
	Metadata   string
	Attributes map[string]string
}

type JobContext

type JobContext struct {
	Job    *livekit.Job
	Room   *lksdk.Room
	Report *agent.SessionReport

	APIKey    string
	APISecret string
	URL       string
}

func NewJobContext

func NewJobContext(job *livekit.Job, url string, apiKey string, apiSecret string) *JobContext

func (*JobContext) AddSIPParticipant

func (c *JobContext) AddSIPParticipant(ctx context.Context, callTo string, trunkID string, identity string, name string) (*livekit.SIPParticipantInfo, error)

AddSIPParticipant adds a SIP participant to the room.

func (*JobContext) Connect

func (c *JobContext) Connect(ctx context.Context, cb *lksdk.RoomCallback) error

func (*JobContext) DeleteRoom

func (c *JobContext) DeleteRoom(ctx context.Context, roomName string) (*livekit.DeleteRoomResponse, error)

DeleteRoom deletes the room and disconnects all participants.

func (*JobContext) Shutdown

func (c *JobContext) Shutdown(reason string)

func (*JobContext) TransferSIPParticipant

func (c *JobContext) TransferSIPParticipant(ctx context.Context, identity string, transferTo string, playDialtone bool) error

TransferSIPParticipant transfers a SIP participant to another number.

type JobRequest

type JobRequest struct {
	Job *livekit.Job
	// contains filtered or unexported fields
}

func (*JobRequest) Accept

func (r *JobRequest) Accept(args JobAcceptArguments) error

func (*JobRequest) Reject

func (r *JobRequest) Reject() error

type PreConnectAudioBuffer

type PreConnectAudioBuffer struct {
	Timestamp time.Time
	Frames    []*model.AudioFrame
}

type PreConnectAudioHandler

type PreConnectAudioHandler struct {
	// contains filtered or unexported fields
}

func NewPreConnectAudioHandler

func NewPreConnectAudioHandler(room *lksdk.Room, timeout time.Duration) *PreConnectAudioHandler

func (*PreConnectAudioHandler) Close added in v0.0.5

func (h *PreConnectAudioHandler) Close()

func (*PreConnectAudioHandler) Register

func (h *PreConnectAudioHandler) Register()

func (*PreConnectAudioHandler) WaitForData

func (h *PreConnectAudioHandler) WaitForData(ctx context.Context, trackID string) []*model.AudioFrame

type RecorderAudioInput added in v0.0.5

type RecorderAudioInput struct {
	// contains filtered or unexported fields
}

func NewRecorderAudioInput added in v0.0.5

func NewRecorderAudioInput(recordingIO *RecorderIO, source agent.AudioInput) *RecorderAudioInput

func (*RecorderAudioInput) Label added in v0.0.5

func (r *RecorderAudioInput) Label() string

func (*RecorderAudioInput) OnAttached added in v0.0.5

func (r *RecorderAudioInput) OnAttached()

func (*RecorderAudioInput) OnDetached added in v0.0.5

func (r *RecorderAudioInput) OnDetached()

func (*RecorderAudioInput) Stream added in v0.0.5

func (r *RecorderAudioInput) Stream() <-chan *model.AudioFrame

type RecorderAudioOutput added in v0.0.5

type RecorderAudioOutput struct {
	// contains filtered or unexported fields
}

func NewRecorderAudioOutput added in v0.0.5

func NewRecorderAudioOutput(recordingIO *RecorderIO, nextInChain agent.AudioOutput, writeCb func([]*model.AudioFrame)) *RecorderAudioOutput

func (*RecorderAudioOutput) CaptureFrame added in v0.0.5

func (r *RecorderAudioOutput) CaptureFrame(frame *model.AudioFrame) error

func (*RecorderAudioOutput) ClearBuffer added in v0.0.5

func (r *RecorderAudioOutput) ClearBuffer()

func (*RecorderAudioOutput) Flush added in v0.0.5

func (r *RecorderAudioOutput) Flush()

func (*RecorderAudioOutput) Label added in v0.0.5

func (r *RecorderAudioOutput) Label() string

func (*RecorderAudioOutput) OnAttached added in v0.0.5

func (r *RecorderAudioOutput) OnAttached()

func (*RecorderAudioOutput) OnDetached added in v0.0.5

func (r *RecorderAudioOutput) OnDetached()

func (*RecorderAudioOutput) OnPlaybackFinished added in v0.0.5

func (r *RecorderAudioOutput) OnPlaybackFinished(cb func(ev agent.PlaybackFinishedEvent))

func (*RecorderAudioOutput) OnPlaybackStarted added in v0.0.5

func (r *RecorderAudioOutput) OnPlaybackStarted(cb func(ev agent.PlaybackStartedEvent))

func (*RecorderAudioOutput) Pause added in v0.0.5

func (r *RecorderAudioOutput) Pause()

func (*RecorderAudioOutput) Resume added in v0.0.5

func (r *RecorderAudioOutput) Resume()

func (*RecorderAudioOutput) WaitForPlayout added in v0.0.5

func (r *RecorderAudioOutput) WaitForPlayout(ctx context.Context) error

type RecorderIO

type RecorderIO struct {
	Session *agent.AgentSession

	OutPath string
	// contains filtered or unexported fields
}

RecorderIO records a conversation as a stereo WAV file. Left channel = user (input), Right channel = agent (output).

func NewRecorderIO

func NewRecorderIO(session *agent.AgentSession) *RecorderIO

func (*RecorderIO) RecordInput

func (r *RecorderIO) RecordInput(source agent.AudioInput) *RecorderAudioInput

func (*RecorderIO) RecordOutput

func (r *RecorderIO) RecordOutput(next agent.AudioOutput) *RecorderAudioOutput

func (*RecorderIO) Start

func (r *RecorderIO) Start(outputPath string, sampleRate int) error

Start begins recording to a stereo WAV file at the given sample rate.

func (*RecorderIO) Stop

func (r *RecorderIO) Stop() error

Stop signals the record loop to flush and close, then waits for it to finish.

type RoomIO

type RoomIO struct {
	Room         *lksdk.Room
	AgentSession *agent.AgentSession
	Options      RoomOptions
	Recorder     *RecorderIO
	// contains filtered or unexported fields
}

func NewRoomIO

func NewRoomIO(room *lksdk.Room, session *agent.AgentSession, opts RoomOptions) *RoomIO

func (*RoomIO) CaptureFrame added in v0.0.5

func (rio *RoomIO) CaptureFrame(frame *model.AudioFrame) error

func (*RoomIO) CaptureVideoFrame added in v0.0.5

func (rio *RoomIO) CaptureVideoFrame(frame *model.VideoFrame) error

--- agent.VideoOutput Implementation ---

func (*RoomIO) ClearBuffer added in v0.0.5

func (rio *RoomIO) ClearBuffer()

func (*RoomIO) Close

func (rio *RoomIO) Close() error

func (*RoomIO) Flush added in v0.0.5

func (rio *RoomIO) Flush()

func (*RoomIO) GetCallback

func (rio *RoomIO) GetCallback() *lksdk.RoomCallback

func (*RoomIO) Identity added in v0.0.5

func (rio *RoomIO) Identity() string

--- agent.MediaPublisher Implementation ---

func (*RoomIO) Label added in v0.0.5

func (rio *RoomIO) Label() string

--- agent.AudioInput Implementation ---

func (*RoomIO) OnAttached added in v0.0.5

func (rio *RoomIO) OnAttached()

func (*RoomIO) OnDetached added in v0.0.5

func (rio *RoomIO) OnDetached()

func (*RoomIO) OnPlaybackFinished added in v0.0.5

func (rio *RoomIO) OnPlaybackFinished(f func(ev agent.PlaybackFinishedEvent))

func (*RoomIO) OnPlaybackStarted added in v0.0.5

func (rio *RoomIO) OnPlaybackStarted(f func(ev agent.PlaybackStartedEvent))

func (*RoomIO) Pause added in v0.0.5

func (rio *RoomIO) Pause()

func (*RoomIO) PublishData added in v0.0.5

func (rio *RoomIO) PublishData(data []byte, topic string, destinationSIDs []string) error

func (*RoomIO) Resume added in v0.0.5

func (rio *RoomIO) Resume()

func (*RoomIO) SetAttributes added in v0.0.5

func (rio *RoomIO) SetAttributes(attrs map[string]string) error

func (*RoomIO) SetParticipant added in v0.0.5

func (rio *RoomIO) SetParticipant(identity string)

func (*RoomIO) Start

func (rio *RoomIO) Start(ctx context.Context) error

func (*RoomIO) Stream added in v0.0.5

func (rio *RoomIO) Stream() <-chan *model.AudioFrame

func (*RoomIO) UnsetParticipant added in v0.0.5

func (rio *RoomIO) UnsetParticipant()

func (*RoomIO) WaitForPlayout added in v0.0.5

func (rio *RoomIO) WaitForPlayout(ctx context.Context) error

type RoomOptions

type RoomOptions struct {
	AudioInput          *AudioInputOptions
	AudioOutput         *AudioOutputOptions
	VideoInput          *VideoInputOptions
	VideoOutput         *VideoOutputOptions
	TextInput           *TextInputOptions
	TextOutput          *TextOutputOptions
	ParticipantKinds    []lksdk.ParticipantKind
	ParticipantIdentity string
	CloseOnDisconnect   bool
	DeleteRoomOnClose   bool
	JobContext          *JobContext // Used for room deletion if DeleteRoomOnClose is true
}

type RoomTextOutput added in v0.0.5

type RoomTextOutput struct {
	// contains filtered or unexported fields
}

func NewRoomTextOutput added in v0.0.5

func NewRoomTextOutput(room *lksdk.Room, participantIdentity string, url, apiKey, apiSecret string) *RoomTextOutput

func (*RoomTextOutput) CaptureText added in v0.0.5

func (t *RoomTextOutput) CaptureText(text string) error

func (*RoomTextOutput) Close added in v0.0.5

func (t *RoomTextOutput) Close()

func (*RoomTextOutput) Flush added in v0.0.5

func (t *RoomTextOutput) Flush()

func (*RoomTextOutput) Label added in v0.0.5

func (t *RoomTextOutput) Label() string

func (*RoomTextOutput) OnAttached added in v0.0.5

func (t *RoomTextOutput) OnAttached()

func (*RoomTextOutput) OnDetached added in v0.0.5

func (t *RoomTextOutput) OnDetached()

func (*RoomTextOutput) SetSegmentID added in v0.0.5

func (t *RoomTextOutput) SetSegmentID(id string)

func (*RoomTextOutput) SetTrackID added in v0.0.5

func (t *RoomTextOutput) SetTrackID(id string)

type RoomVideoInput added in v0.0.5

type RoomVideoInput struct {
	// contains filtered or unexported fields
}

func NewRoomVideoInput added in v0.0.5

func NewRoomVideoInput(rio *RoomIO) *RoomVideoInput

func (*RoomVideoInput) Label added in v0.0.5

func (rvi *RoomVideoInput) Label() string

func (*RoomVideoInput) OnAttached added in v0.0.5

func (rvi *RoomVideoInput) OnAttached()

func (*RoomVideoInput) OnDetached added in v0.0.5

func (rvi *RoomVideoInput) OnDetached()

func (*RoomVideoInput) Stream added in v0.0.5

func (rvi *RoomVideoInput) Stream() <-chan *model.VideoFrame

type TextInputOptions added in v0.0.5

type TextInputOptions struct {
	Enabled      bool
	InputHandler func(s *agent.AgentSession, text string) error
}

type TextOutputOptions added in v0.0.5

type TextOutputOptions struct {
	Enabled                  bool
	SyncTranscription        bool
	TranscriptionSpeedFactor float64
}

type VideoInputOptions added in v0.0.5

type VideoInputOptions struct {
	Enabled bool
}

type VideoOutputOptions added in v0.0.5

type VideoOutputOptions struct {
	Enabled bool
}

type WorkerOptions

type WorkerOptions struct {
	AgentName           string
	WorkerType          WorkerType
	MaxRetry            int
	LoadFn              func(*AgentServer) float64
	WSRL                string
	APIKey              string
	APISecret           string
	HTTPProxy           string
	JobMemoryWarnMB     float64
	JobMemoryLimitMB    float64
	NumIdleProcesses    int
	DrainTimeoutSeconds int
}

type WorkerType

type WorkerType string
const (
	WorkerTypeRoom      WorkerType = "room"
	WorkerTypePublisher WorkerType = "publisher"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL